This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d6d90e9 Add load test command in tpch.rs. (#1530)
d6d90e9 is described below
commit d6d90e93117293adfa7aa6f4a93bd796665c28a3
Author: Yang <[email protected]>
AuthorDate: Mon Jan 10 03:11:54 2022 +0800
Add load test command in tpch.rs. (#1530)
---
benchmarks/Cargo.toml | 1 +
benchmarks/README.md | 15 +++
benchmarks/src/bin/tpch.rs | 239 +++++++++++++++++++++++++++++++++++++++------
3 files changed, 224 insertions(+), 31 deletions(-)
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index c042778..d20de31 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -40,6 +40,7 @@ futures = "0.3"
env_logger = "0.9"
mimalloc = { version = "0.1", optional = true, default-features = false }
snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] }
+rand = "0.8.4"
[dev-dependencies]
ballista-core = { path = "../ballista/rust/core" }
diff --git a/benchmarks/README.md b/benchmarks/README.md
index a63761b..e6c1743 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -178,5 +178,20 @@ Query 'fare_amt_by_passenger' iteration 1 took 7599 ms
Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
```
+## Running the Ballista Loadtest
+
+```bash
+ cargo run --bin tpch -- loadtest ballista-load
+ --query-list 1,3,5,6,7,10,12,13
+ --requests 200
+ --concurrency 10
+ --data-path /****
+ --format parquet
+ --host localhost
+ --port 50050
+ --sql-path /***
+ --debug
+```
+
[1]: http://www.tpc.org/tpch/
[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 71e68b6..d9317fe 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -17,6 +17,9 @@
//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
+use futures::future::join_all;
+use rand::prelude::*;
+use std::ops::Div;
use std::{
fs,
iter::Iterator,
@@ -137,6 +140,48 @@ struct DataFusionBenchmarkOpt {
mem_table: bool,
}
+#[derive(Debug, StructOpt, Clone)]
+struct BallistaLoadtestOpt {
+ #[structopt(short = "q", long)]
+ query_list: String,
+
+ /// Activate debug mode to see query results
+ #[structopt(short, long)]
+ debug: bool,
+
+ /// Number of requests
+ #[structopt(short = "r", long = "requests", default_value = "100")]
+ requests: usize,
+
+ /// Number of connections
+ #[structopt(short = "c", long = "concurrency", default_value = "5")]
+ concurrency: usize,
+
+ /// Number of partitions to process in parallel
+ #[structopt(short = "n", long = "partitions", default_value = "2")]
+ partitions: usize,
+
+ /// Path to data files
+ #[structopt(parse(from_os_str), required = true, short = "p", long =
"data-path")]
+ path: PathBuf,
+
+ /// Path to sql files
+ #[structopt(parse(from_os_str), required = true, long = "sql-path")]
+ sql_path: PathBuf,
+
+ /// File format: `csv` or `parquet`
+ #[structopt(short = "f", long = "format", default_value = "parquet")]
+ file_format: String,
+
+ /// Ballista executor host
+ #[structopt(long = "host")]
+ host: Option<String>,
+
+ /// Ballista executor port
+ #[structopt(long = "port")]
+ port: Option<u16>,
+}
+
#[derive(Debug, StructOpt)]
struct ConvertOpt {
/// Path to csv files
@@ -174,10 +219,18 @@ enum BenchmarkSubCommandOpt {
}
#[derive(Debug, StructOpt)]
+#[structopt(about = "loadtest command")]
+enum LoadtestOpt {
+ #[structopt(name = "ballista-load")]
+ BallistaLoadtest(BallistaLoadtestOpt),
+}
+
+#[derive(Debug, StructOpt)]
#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
enum TpchOpt {
Benchmark(BenchmarkSubCommandOpt),
Convert(ConvertOpt),
+ Loadtest(LoadtestOpt),
}
const TABLES: &[&str] = &[
@@ -187,6 +240,7 @@ const TABLES: &[&str] = &[
#[tokio::main]
async fn main() -> Result<()> {
use BenchmarkSubCommandOpt::*;
+ use LoadtestOpt::*;
env_logger::init();
match TpchOpt::from_args() {
@@ -197,6 +251,9 @@ async fn main() -> Result<()> {
benchmark_datafusion(opt).await.map(|_| ())
}
TpchOpt::Convert(opt) => convert_tbl(opt).await,
+ TpchOpt::Loadtest(BallistaLoadtest(opt)) => {
+ loadtest_ballista(opt).await.map(|_| ())
+ }
}
}
@@ -268,6 +325,151 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) ->
Result<()> {
// register tables with Ballista context
let path = opt.path.to_str().unwrap();
let file_format = opt.file_format.as_str();
+
+ register_tables(path, file_format, &ctx).await;
+
+ let mut millis = vec![];
+
+ // run benchmark
+ let sql = get_query_sql(opt.query)?;
+ println!("Running benchmark with query {}:\n {}", opt.query, sql);
+ for i in 0..opt.iterations {
+ let start = Instant::now();
+ let df = ctx
+ .sql(&sql)
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
+ let batches = df
+ .collect()
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
+ let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+ millis.push(elapsed as f64);
+ println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
+ if opt.debug {
+ pretty::print_batches(&batches)?;
+ }
+ }
+
+ let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+ println!("Query {} avg time: {:.2} ms", opt.query, avg);
+
+ Ok(())
+}
+
+async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> {
+ println!(
+ "Running loadtest_ballista with the following options: {:?}",
+ opt
+ );
+
+ let config = BallistaConfig::builder()
+ .set(
+ BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+ &format!("{}", opt.partitions),
+ )
+ .build()
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+
+ let concurrency = opt.concurrency;
+ let request_amount = opt.requests;
+ let mut clients = vec![];
+
+ for _num in 0..concurrency {
+ clients.push(BallistaContext::remote(
+ opt.host.clone().unwrap().as_str(),
+ opt.port.unwrap(),
+ &config,
+ ));
+ }
+
+ // register tables with Ballista context
+ let path = opt.path.to_str().unwrap();
+ let file_format = opt.file_format.as_str();
+ let sql_path = opt.sql_path.to_str().unwrap().to_string();
+
+ for ctx in &clients {
+ register_tables(path, file_format, ctx).await;
+ }
+
+ let request_per_thread = request_amount.div(concurrency);
+ // run benchmark
+ let query_list: Vec<usize> = opt
+ .query_list
+ .split(',')
+ .map(|s| s.parse().unwrap())
+ .collect();
+ println!("query list: {:?} ", &query_list);
+
+ let total = Instant::now();
+ let mut futures = vec![];
+
+ for (client_id, client) in clients.into_iter().enumerate() {
+ let query_list_clone = query_list.clone();
+ let sql_path_clone = sql_path.clone();
+ let handle = tokio::spawn(async move {
+ for i in 0..request_per_thread {
+ let query_id = query_list_clone
+ .get(
+ (0..query_list_clone.len())
+ .choose(&mut rand::thread_rng())
+ .unwrap(),
+ )
+ .unwrap();
+ let sql =
+ get_query_sql_by_path(query_id.to_owned(),
sql_path_clone.clone())
+ .unwrap();
+ println!(
+ "Client {} Round {} Query {} started",
+ &client_id, &i, query_id
+ );
+ let start = Instant::now();
+ let df = client
+ .sql(&sql)
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
+ let batches = df
+ .collect()
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
+ let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+ println!(
+ "Client {} Round {} Query {} took {:.1} ms ",
+ &client_id, &i, query_id, elapsed
+ );
+ if opt.debug {
+ pretty::print_batches(&batches).unwrap();
+ }
+ }
+ });
+ futures.push(handle);
+ }
+ join_all(futures).await;
+ let elapsed = total.elapsed().as_secs_f64() * 1000.0;
+ println!("###############################");
+ println!("load test took {:.1} ms", elapsed);
+ Ok(())
+}
+
+fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String>
{
+ if sql_path.ends_with('/') {
+ sql_path.pop();
+ }
+ if query > 0 && query < 23 {
+ let filename = format!("{}/q{}.sql", sql_path, query);
+ Ok(fs::read_to_string(&filename).expect("failed to read query"))
+ } else {
+ Err(DataFusionError::Plan(
+ "invalid query. Expected value between 1 and 22".to_owned(),
+ ))
+ }
+}
+
+async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext)
{
for table in TABLES {
match file_format {
// dbgen creates .tbl ('|' delimited) files without header
@@ -281,7 +483,8 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) ->
Result<()> {
.file_extension(".tbl");
ctx.register_csv(table, &path, options)
.await
- .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
}
"csv" => {
let path = format!("{}/{}", path, table);
@@ -289,47 +492,21 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) ->
Result<()> {
let options =
CsvReadOptions::new().schema(&schema).has_header(true);
ctx.register_csv(table, &path, options)
.await
- .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
}
"parquet" => {
let path = format!("{}/{}", path, table);
ctx.register_parquet(table, &path)
.await
- .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+ .unwrap();
}
other => {
unimplemented!("Invalid file format '{}'", other);
}
}
}
-
- let mut millis = vec![];
-
- // run benchmark
- let sql = get_query_sql(opt.query)?;
- println!("Running benchmark with query {}:\n {}", opt.query, sql);
- for i in 0..opt.iterations {
- let start = Instant::now();
- let df = ctx
- .sql(&sql)
- .await
- .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
- let batches = df
- .collect()
- .await
- .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
- let elapsed = start.elapsed().as_secs_f64() * 1000.0;
- millis.push(elapsed as f64);
- println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
- if opt.debug {
- pretty::print_batches(&batches)?;
- }
- }
-
- let avg = millis.iter().sum::<f64>() / millis.len() as f64;
- println!("Query {} avg time: {:.2} ms", opt.query, avg);
-
- Ok(())
}
fn get_query_sql(query: usize) -> Result<String> {