This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 34f75131a feat: make query parameter optional in tpch benchmark (#1391)
34f75131a is described below
commit 34f75131a273e953ffbbc31aa48d6c60fade7dd9
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jan 17 15:50:38 2026 -0700
feat: make query parameter optional in tpch benchmark (#1391)
When running the tpch benchmark, the --query parameter is now optional.
If not specified, all 22 TPC-H queries will be run sequentially.
Changes:
- Make --query optional for both datafusion and ballista benchmarks
- Run all 22 queries when --query is not specified
- Only print SQL queries when --debug flag is enabled
- Write a single JSON output file for the entire benchmark run
- Fix parquet file path resolution for datafusion benchmarks
- Simplify output when iterations=1 (no iteration number, no average)
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
benchmarks/src/bin/tpch.rs | 271 +++++++++++++++++++++++++++++----------------
1 file changed, 174 insertions(+), 97 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index d1401a658..4bfc133ef 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -28,11 +28,14 @@ use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionState;
+#[cfg(test)]
use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{Expr, expr::Cast};
use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
+#[cfg(test)]
use datafusion::physical_plan::display::DisplayableExecutionPlan;
+#[cfg(test)]
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion::{
@@ -60,6 +63,7 @@ use std::{
time::{Instant, SystemTime},
};
use structopt::StructOpt;
+#[cfg(test)]
use tokio::task::JoinHandle;
#[cfg(feature = "snmalloc")]
@@ -72,9 +76,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[derive(Debug, StructOpt, Clone)]
struct BallistaBenchmarkOpt {
- /// Query number
+ /// Query number (1-22). If not specified, runs all queries.
#[structopt(short, long)]
- query: usize,
+ query: Option<usize>,
/// Activate debug mode to see query results
#[structopt(short, long)]
@@ -122,9 +126,9 @@ struct BallistaBenchmarkOpt {
#[derive(Debug, StructOpt, Clone)]
struct DataFusionBenchmarkOpt {
- /// Query number
+ /// Query number (1-22). If not specified, runs all queries.
#[structopt(short, long)]
- query: usize,
+ query: Option<usize>,
/// Activate debug mode to see query results
#[structopt(short, long)]
@@ -283,7 +287,6 @@ async fn main() -> Result<()> {
#[allow(clippy::await_holding_lock)]
async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) ->
Result<Vec<RecordBatch>> {
println!("Running benchmarks with the following options: {opt:?}");
- let mut benchmark_run = BenchmarkRun::new(opt.query);
let config = SessionConfig::new()
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
@@ -319,30 +322,65 @@ async fn benchmark_datafusion(opt:
DataFusionBenchmarkOpt) -> Result<Vec<RecordB
}
}
- let mut millis = vec![];
- // run benchmark
+ // Determine which queries to run
+ let query_numbers: Vec<usize> = opt
+ .query
+ .map(|q| vec![q])
+ .unwrap_or_else(|| (1..=22).collect());
+
+ let mut benchmark_run = BenchmarkRun::new();
let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
- for i in 0..opt.iterations {
- let start = Instant::now();
- let plans = create_logical_plans(&ctx, opt.query).await?;
- for plan in plans {
- result = execute_query(&ctx, &plan, opt.debug).await?;
+
+ for query in query_numbers {
+ let mut query_run = QueryRun::new(query);
+ let mut millis = vec![];
+
+ // run benchmark
+ let sqls = get_query_sql(query)?;
+ if opt.debug {
+ println!("Query {query}:\n{sqls:?}");
+ }
+ for i in 0..opt.iterations {
+ let start = Instant::now();
+ // Execute each SQL statement sequentially (required for queries
like q15
+ // that create views and then reference them)
+ for sql in &sqls {
+ if opt.debug {
+ println!("Executing: {sql}");
+ }
+ let df = ctx.sql(sql).await?;
+ result = df.collect().await?;
+ }
+ let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+ if opt.debug {
+ pretty::print_batches(&result)?;
+ }
+ millis.push(elapsed);
+ let row_count = result.iter().map(|b| b.num_rows()).sum();
+ if opt.iterations == 1 {
+ println!(
+ "Query {} took {:.1} ms and returned {} rows",
+ query, elapsed, row_count
+ );
+ } else {
+ println!(
+ "Query {} iteration {} took {:.1} ms and returned {} rows",
+ query, i, elapsed, row_count
+ );
+ }
+ query_run.add_result(elapsed, row_count);
}
- let elapsed = start.elapsed().as_secs_f64() * 1000.0;
- millis.push(elapsed);
- let row_count = result.iter().map(|b| b.num_rows()).sum();
- println!(
- "Query {} iteration {} took {:.1} ms and returned {} rows",
- opt.query, i, elapsed, row_count
- );
- benchmark_run.add_result(elapsed, row_count);
- }
- let avg = millis.iter().sum::<f64>() / millis.len() as f64;
- println!("Query {} avg time: {:.2} ms", opt.query, avg);
+ if opt.iterations > 1 {
+ let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+ println!("Query {} avg time: {:.1} ms", query, avg);
+ }
+
+ benchmark_run.add_query_run(query_run);
+ }
if let Some(path) = &opt.output_path {
- write_summary_json(&mut benchmark_run, path)?;
+ write_summary_json(&benchmark_run, path)?;
}
Ok(result)
@@ -350,93 +388,112 @@ async fn benchmark_datafusion(opt:
DataFusionBenchmarkOpt) -> Result<Vec<RecordB
async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
println!("Running benchmarks with the following options: {opt:?}");
- let mut benchmark_run = BenchmarkRun::new(opt.query);
- let config = SessionConfig::new_with_ballista()
- .with_target_partitions(opt.partitions)
- .with_ballista_job_name(&format!("Query derived from TPC-H q{}",
opt.query))
- .with_batch_size(opt.batch_size)
- .with_collect_statistics(true);
-
- let state = SessionStateBuilder::new()
- .with_default_features()
- .with_config(config)
- .build();
let address = format!(
"df://{}:{}",
opt.host.clone().unwrap().as_str(),
opt.port.unwrap()
);
- let ctx = SessionContext::remote_with_state(&address, state).await?;
- // register tables with Ballista context
- let path = opt.path.to_str().unwrap();
- let file_format = opt.file_format.as_str();
+ // Determine which queries to run
+ let query_numbers: Vec<usize> = opt
+ .query
+ .map(|q| vec![q])
+ .unwrap_or_else(|| (1..=22).collect());
- register_tables(path, file_format, &ctx, opt.debug).await?;
+ let mut benchmark_run = BenchmarkRun::new();
- let mut millis = vec![];
+ for query in query_numbers {
+ let mut query_run = QueryRun::new(query);
- // run benchmark
- let queries = get_query_sql(opt.query)?;
- println!(
- "Running benchmark with queries {}:\n {:?}",
- opt.query, queries
- );
- let mut batches = vec![];
- for i in 0..opt.iterations {
- let start = Instant::now();
- for sql in &queries {
- let df = ctx
- .sql(sql)
- .await
- .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
- .unwrap();
- let plan = df.clone().into_optimized_plan()?;
+ let config = SessionConfig::new_with_ballista()
+ .with_target_partitions(opt.partitions)
+ .with_ballista_job_name(&format!("Query derived from TPC-H q{}",
query))
+ .with_batch_size(opt.batch_size)
+ .with_collect_statistics(true);
+
+ let state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_config(config)
+ .build();
+ let ctx = SessionContext::remote_with_state(&address, state).await?;
+
+ // register tables with Ballista context
+ let path = opt.path.to_str().unwrap();
+ let file_format = opt.file_format.as_str();
+
+ register_tables(path, file_format, &ctx, opt.debug).await?;
+
+ let mut millis = vec![];
+
+ // run benchmark
+ let sqls = get_query_sql(query)?;
+ if opt.debug {
+ println!("Running benchmark with query {}:\n {:?}", query, sqls);
+ }
+ let mut batches = vec![];
+ for i in 0..opt.iterations {
+ let start = Instant::now();
+ for sql in &sqls {
+ let df = ctx
+ .sql(sql)
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
+ .unwrap();
+ let plan = df.clone().into_optimized_plan()?;
+ if opt.debug {
+ println!("=== Optimized logical plan ===\n{plan:?}\n");
+ }
+ batches = df
+ .collect()
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
+ .unwrap();
+ }
+ let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+ millis.push(elapsed);
+ let row_count = batches.iter().map(|b| b.num_rows()).sum();
+ if opt.iterations == 1 {
+ println!(
+ "Query {} took {:.1} ms and returned {} rows",
+ query, elapsed, row_count
+ );
+ } else {
+ println!(
+ "Query {} iteration {} took {:.1} ms and returned {} rows",
+ query, i, elapsed, row_count
+ );
+ }
+ query_run.add_result(elapsed, row_count);
if opt.debug {
- println!("=== Optimized logical plan ===\n{plan:?}\n");
+ pretty::print_batches(&batches)?;
+ }
+
+ if let Some(expected_results_path) = opt.expected_results.as_ref()
{
+ let expected = get_expected_results(query,
expected_results_path).await?;
+ assert_expected_results(&expected, &batches)
}
- batches = df
- .collect()
- .await
- .map_err(|e| DataFusionError::Plan(format!("{e:?}")))
- .unwrap();
- }
- let elapsed = start.elapsed().as_secs_f64() * 1000.0;
- millis.push(elapsed);
- let row_count = batches.iter().map(|b| b.num_rows()).sum();
- println!(
- "Query {} iteration {} took {:.1} ms and returned {} rows",
- opt.query, i, elapsed, row_count
- );
- benchmark_run.add_result(elapsed, row_count);
- if opt.debug {
- pretty::print_batches(&batches)?;
}
- if let Some(expected_results_path) = opt.expected_results.as_ref() {
- let expected = get_expected_results(opt.query,
expected_results_path).await?;
- assert_expected_results(&expected, &batches)
+ if opt.iterations > 1 {
+ let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+ println!("Query {} avg time: {:.1} ms", query, avg);
}
- }
- let avg = millis.iter().sum::<f64>() / millis.len() as f64;
- println!("Query {} avg time: {:.2} ms", opt.query, avg);
+ benchmark_run.add_query_run(query_run);
+ }
if let Some(path) = &opt.output_path {
- write_summary_json(&mut benchmark_run, path)?;
+ write_summary_json(&benchmark_run, path)?;
}
Ok(())
}
-fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) ->
Result<()> {
+fn write_summary_json(benchmark_run: &BenchmarkRun, path: &Path) -> Result<()>
{
let json =
serde_json::to_string_pretty(&benchmark_run).expect("summary is
serializable");
- let filename = format!(
- "tpch-q{}-{}.json",
- benchmark_run.query, benchmark_run.start_time
- );
+ let filename = format!("tpch-{}.json", benchmark_run.start_time);
let path = path.join(filename);
println!(
"Writing summary file to {}",
@@ -662,6 +719,7 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
}
/// Create a logical plan for each query in the specified query file
+#[cfg(test)]
async fn create_logical_plans(
ctx: &SessionContext,
query: usize,
@@ -687,6 +745,7 @@ async fn create_logical_plans(
.collect()
}
+#[cfg(test)]
async fn execute_query(
ctx: &SessionContext,
plan: &LogicalPlan,
@@ -833,7 +892,7 @@ async fn get_table(
)
}
"parquet" => {
- let path = format!("{path}/{table}");
+ let path = find_path(path, table, "parquet")?;
let format = ParquetFormat::default().with_enable_pruning(true);
(
@@ -971,6 +1030,27 @@ pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
schema.finish()
}
+#[derive(Debug, Serialize)]
+struct QueryRun {
+ /// query number
+ query: usize,
+ /// list of individual run times and row counts
+ iterations: Vec<QueryResult>,
+}
+
+impl QueryRun {
+ fn new(query: usize) -> Self {
+ Self {
+ query,
+ iterations: vec![],
+ }
+ }
+
+ fn add_result(&mut self, elapsed: f64, row_count: usize) {
+ self.iterations.push(QueryResult { elapsed, row_count })
+ }
+}
+
#[derive(Debug, Serialize)]
struct BenchmarkRun {
/// Benchmark crate version
@@ -983,14 +1063,12 @@ struct BenchmarkRun {
start_time: u64,
/// CLI arguments
arguments: Vec<String>,
- /// query number
- query: usize,
- /// list of individual run times and row counts
- iterations: Vec<QueryResult>,
+ /// Results for each query
+ queries: Vec<QueryRun>,
}
impl BenchmarkRun {
- fn new(query: usize) -> Self {
+ fn new() -> Self {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
@@ -1000,13 +1078,12 @@ impl BenchmarkRun {
.expect("current time is later than UNIX_EPOCH")
.as_secs(),
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
- query,
- iterations: vec![],
+ queries: vec![],
}
}
- fn add_result(&mut self, elapsed: f64, row_count: usize) {
- self.iterations.push(QueryResult { elapsed, row_count })
+ fn add_query_run(&mut self, query_run: QueryRun) {
+ self.queries.push(query_run)
}
}
@@ -1637,7 +1714,7 @@ mod tests {
// run the query to compute actual results of the query
let opt = DataFusionBenchmarkOpt {
- query: n,
+ query: Some(n),
debug: false,
iterations: 1,
partitions: 2,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]