This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d6d90e9  Add load test command in tpch.rs. (#1530)
d6d90e9 is described below

commit d6d90e93117293adfa7aa6f4a93bd796665c28a3
Author: Yang <[email protected]>
AuthorDate: Mon Jan 10 03:11:54 2022 +0800

    Add load test command in tpch.rs. (#1530)
---
 benchmarks/Cargo.toml      |   1 +
 benchmarks/README.md       |  15 +++
 benchmarks/src/bin/tpch.rs | 239 +++++++++++++++++++++++++++++++++++++++------
 3 files changed, 224 insertions(+), 31 deletions(-)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index c042778..d20de31 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -40,6 +40,7 @@ futures = "0.3"
 env_logger = "0.9"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] }
+rand = "0.8.4"
 
 [dev-dependencies]
 ballista-core = { path = "../ballista/rust/core" }
diff --git a/benchmarks/README.md b/benchmarks/README.md
index a63761b..e6c1743 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -178,5 +178,20 @@ Query 'fare_amt_by_passenger' iteration 1 took 7599 ms
 Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
 ```
 
+## Running the Ballista Loadtest
+
+```bash
+ cargo run --bin tpch -- loadtest  ballista-load 
+  --query-list 1,3,5,6,7,10,12,13 
+  --requests 200 
+  --concurrency 10  
+  --data-path /**** 
+  --format parquet 
+  --host localhost 
+  --port 50050 
+  --sql-path /***
+  --debug
+```
+
 [1]: http://www.tpc.org/tpch/
 [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 71e68b6..d9317fe 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -17,6 +17,9 @@
 
 //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
 
+use futures::future::join_all;
+use rand::prelude::*;
+use std::ops::Div;
 use std::{
     fs,
     iter::Iterator,
@@ -137,6 +140,48 @@ struct DataFusionBenchmarkOpt {
     mem_table: bool,
 }
 
+#[derive(Debug, StructOpt, Clone)]
+struct BallistaLoadtestOpt {
+    #[structopt(short = "q", long)]
+    query_list: String,
+
+    /// Activate debug mode to see query results
+    #[structopt(short, long)]
+    debug: bool,
+
+    /// Number of requests
+    #[structopt(short = "r", long = "requests", default_value = "100")]
+    requests: usize,
+
+    /// Number of connections
+    #[structopt(short = "c", long = "concurrency", default_value = "5")]
+    concurrency: usize,
+
+    /// Number of partitions to process in parallel
+    #[structopt(short = "n", long = "partitions", default_value = "2")]
+    partitions: usize,
+
+    /// Path to data files
+    #[structopt(parse(from_os_str), required = true, short = "p", long = 
"data-path")]
+    path: PathBuf,
+
+    /// Path to sql files
+    #[structopt(parse(from_os_str), required = true, long = "sql-path")]
+    sql_path: PathBuf,
+
+    /// File format: `csv` or `parquet`
+    #[structopt(short = "f", long = "format", default_value = "parquet")]
+    file_format: String,
+
+    /// Ballista executor host
+    #[structopt(long = "host")]
+    host: Option<String>,
+
+    /// Ballista executor port
+    #[structopt(long = "port")]
+    port: Option<u16>,
+}
+
 #[derive(Debug, StructOpt)]
 struct ConvertOpt {
     /// Path to csv files
@@ -174,10 +219,18 @@ enum BenchmarkSubCommandOpt {
 }
 
 #[derive(Debug, StructOpt)]
+#[structopt(about = "loadtest command")]
+enum LoadtestOpt {
+    #[structopt(name = "ballista-load")]
+    BallistaLoadtest(BallistaLoadtestOpt),
+}
+
+#[derive(Debug, StructOpt)]
 #[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
 enum TpchOpt {
     Benchmark(BenchmarkSubCommandOpt),
     Convert(ConvertOpt),
+    Loadtest(LoadtestOpt),
 }
 
 const TABLES: &[&str] = &[
@@ -187,6 +240,7 @@ const TABLES: &[&str] = &[
 #[tokio::main]
 async fn main() -> Result<()> {
     use BenchmarkSubCommandOpt::*;
+    use LoadtestOpt::*;
 
     env_logger::init();
     match TpchOpt::from_args() {
@@ -197,6 +251,9 @@ async fn main() -> Result<()> {
             benchmark_datafusion(opt).await.map(|_| ())
         }
         TpchOpt::Convert(opt) => convert_tbl(opt).await,
+        TpchOpt::Loadtest(BallistaLoadtest(opt)) => {
+            loadtest_ballista(opt).await.map(|_| ())
+        }
     }
 }
 
@@ -268,6 +325,151 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> 
Result<()> {
     // register tables with Ballista context
     let path = opt.path.to_str().unwrap();
     let file_format = opt.file_format.as_str();
+
+    register_tables(path, file_format, &ctx).await;
+
+    let mut millis = vec![];
+
+    // run benchmark
+    let sql = get_query_sql(opt.query)?;
+    println!("Running benchmark with query {}:\n {}", opt.query, sql);
+    for i in 0..opt.iterations {
+        let start = Instant::now();
+        let df = ctx
+            .sql(&sql)
+            .await
+            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+            .unwrap();
+        let batches = df
+            .collect()
+            .await
+            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+            .unwrap();
+        let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+        millis.push(elapsed as f64);
+        println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
+        if opt.debug {
+            pretty::print_batches(&batches)?;
+        }
+    }
+
+    let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+    println!("Query {} avg time: {:.2} ms", opt.query, avg);
+
+    Ok(())
+}
+
+async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> {
+    println!(
+        "Running loadtest_ballista with the following options: {:?}",
+        opt
+    );
+
+    let config = BallistaConfig::builder()
+        .set(
+            BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+            &format!("{}", opt.partitions),
+        )
+        .build()
+        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+
+    let concurrency = opt.concurrency;
+    let request_amount = opt.requests;
+    let mut clients = vec![];
+
+    for _num in 0..concurrency {
+        clients.push(BallistaContext::remote(
+            opt.host.clone().unwrap().as_str(),
+            opt.port.unwrap(),
+            &config,
+        ));
+    }
+
+    // register tables with Ballista context
+    let path = opt.path.to_str().unwrap();
+    let file_format = opt.file_format.as_str();
+    let sql_path = opt.sql_path.to_str().unwrap().to_string();
+
+    for ctx in &clients {
+        register_tables(path, file_format, ctx).await;
+    }
+
+    let request_per_thread = request_amount.div(concurrency);
+    // run benchmark
+    let query_list: Vec<usize> = opt
+        .query_list
+        .split(',')
+        .map(|s| s.parse().unwrap())
+        .collect();
+    println!("query list: {:?} ", &query_list);
+
+    let total = Instant::now();
+    let mut futures = vec![];
+
+    for (client_id, client) in clients.into_iter().enumerate() {
+        let query_list_clone = query_list.clone();
+        let sql_path_clone = sql_path.clone();
+        let handle = tokio::spawn(async move {
+            for i in 0..request_per_thread {
+                let query_id = query_list_clone
+                    .get(
+                        (0..query_list_clone.len())
+                            .choose(&mut rand::thread_rng())
+                            .unwrap(),
+                    )
+                    .unwrap();
+                let sql =
+                    get_query_sql_by_path(query_id.to_owned(), 
sql_path_clone.clone())
+                        .unwrap();
+                println!(
+                    "Client {} Round {} Query {} started",
+                    &client_id, &i, query_id
+                );
+                let start = Instant::now();
+                let df = client
+                    .sql(&sql)
+                    .await
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+                    .unwrap();
+                let batches = df
+                    .collect()
+                    .await
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+                    .unwrap();
+                let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+                println!(
+                    "Client {} Round {} Query {} took {:.1} ms ",
+                    &client_id, &i, query_id, elapsed
+                );
+                if opt.debug {
+                    pretty::print_batches(&batches).unwrap();
+                }
+            }
+        });
+        futures.push(handle);
+    }
+    join_all(futures).await;
+    let elapsed = total.elapsed().as_secs_f64() * 1000.0;
+    println!("###############################");
+    println!("load test  took {:.1} ms", elapsed);
+    Ok(())
+}
+
+fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> 
{
+    if sql_path.ends_with('/') {
+        sql_path.pop();
+    }
+    if query > 0 && query < 23 {
+        let filename = format!("{}/q{}.sql", sql_path, query);
+        Ok(fs::read_to_string(&filename).expect("failed to read query"))
+    } else {
+        Err(DataFusionError::Plan(
+            "invalid query. Expected value between 1 and 22".to_owned(),
+        ))
+    }
+}
+
+async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) 
{
     for table in TABLES {
         match file_format {
             // dbgen creates .tbl ('|' delimited) files without header
@@ -281,7 +483,8 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> 
Result<()> {
                     .file_extension(".tbl");
                 ctx.register_csv(table, &path, options)
                     .await
-                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+                    .unwrap();
             }
             "csv" => {
                 let path = format!("{}/{}", path, table);
@@ -289,47 +492,21 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> 
Result<()> {
                 let options = 
CsvReadOptions::new().schema(&schema).has_header(true);
                 ctx.register_csv(table, &path, options)
                     .await
-                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+                    .unwrap();
             }
             "parquet" => {
                 let path = format!("{}/{}", path, table);
                 ctx.register_parquet(table, &path)
                     .await
-                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
+                    .unwrap();
             }
             other => {
                 unimplemented!("Invalid file format '{}'", other);
             }
         }
     }
-
-    let mut millis = vec![];
-
-    // run benchmark
-    let sql = get_query_sql(opt.query)?;
-    println!("Running benchmark with query {}:\n {}", opt.query, sql);
-    for i in 0..opt.iterations {
-        let start = Instant::now();
-        let df = ctx
-            .sql(&sql)
-            .await
-            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
-        let batches = df
-            .collect()
-            .await
-            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
-        let elapsed = start.elapsed().as_secs_f64() * 1000.0;
-        millis.push(elapsed as f64);
-        println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
-        if opt.debug {
-            pretty::print_batches(&batches)?;
-        }
-    }
-
-    let avg = millis.iter().sum::<f64>() / millis.len() as f64;
-    println!("Query {} avg time: {:.2} ms", opt.query, avg);
-
-    Ok(())
 }
 
 fn get_query_sql(query: usize) -> Result<String> {

Reply via email to