This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 79c83b2  ARROW-10240: [Rust] Optionally load data into memory before 
running benchmark query
79c83b2 is described below

commit 79c83b261d45810f1ac8acc165dd110875305da4
Author: Jörn Horstmann <[email protected]>
AuthorDate: Sat Oct 10 12:52:13 2020 -0600

    ARROW-10240: [Rust] Optionally load data into memory before running 
benchmark query
    
    Closes #8409 from jhorstmann/ARROW-10240-load-data-into-memory-for-tpch
    
    Authored-by: Jörn Horstmann <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/benchmarks/src/bin/tpch.rs                 | 74 ++++++++++++++++---------
 rust/datafusion/benches/sort_limit_query_sql.rs |  2 +-
 rust/datafusion/src/datasource/memory.rs        |  4 +-
 3 files changed, 52 insertions(+), 28 deletions(-)

diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index 0fe5a1a..b76ae13 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -26,6 +26,8 @@ use arrow::util::pretty;
 use datafusion::error::Result;
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
 
+use datafusion::datasource::parquet::ParquetTable;
+use datafusion::datasource::{CsvFile, MemTable, TableProvider};
 use datafusion::physical_plan::csv::CsvReadOptions;
 use structopt::StructOpt;
 
@@ -59,6 +61,10 @@ struct TpchOpt {
     /// File format: `csv` or `parquet`
     #[structopt(short = "f", long = "format", default_value = "csv")]
     file_format: String,
+
+    /// Load the data into a MemTable before executing the query
+    #[structopt(short = "m", long = "mem-table")]
+    mem_table: bool,
 }
 
 #[tokio::main]
@@ -73,31 +79,49 @@ async fn main() -> Result<()> {
 
     let path = opt.path.to_str().unwrap();
 
-    match opt.file_format.as_str() {
-        // dbgen creates .tbl ('|' delimited) files
-        "tbl" => {
-            let path = format!("{}/lineitem.tbl", path);
-            let schema = lineitem_schema();
-            let options = CsvReadOptions::new()
-                .schema(&schema)
-                .delimiter(b'|')
-                .file_extension(".tbl");
-            ctx.register_csv("lineitem", &path, options)?
-        }
-        "csv" => {
-            let path = format!("{}/lineitem", path);
-            let schema = lineitem_schema();
-            let options = 
CsvReadOptions::new().schema(&schema).has_header(true);
-            ctx.register_csv("lineitem", &path, options)?
-        }
-        "parquet" => {
-            let path = format!("{}/lineitem", path);
-            ctx.register_parquet("lineitem", &path)?
-        }
-        other => {
-            println!("Invalid file format '{}'", other);
-            process::exit(-1);
-        }
+    let tableprovider: Box<dyn TableProvider + Send + Sync> =
+        match opt.file_format.as_str() {
+            // dbgen creates .tbl ('|' delimited) files
+            "tbl" => {
+                let path = format!("{}/lineitem.tbl", path);
+                let schema = lineitem_schema();
+                let options = CsvReadOptions::new()
+                    .schema(&schema)
+                    .delimiter(b'|')
+                    .file_extension(".tbl");
+
+                Box::new(CsvFile::try_new(&path, options)?)
+            }
+            "csv" => {
+                let path = format!("{}/lineitem", path);
+                let schema = lineitem_schema();
+                let options = 
CsvReadOptions::new().schema(&schema).has_header(true);
+
+                Box::new(CsvFile::try_new(&path, options)?)
+            }
+            "parquet" => {
+                let path = format!("{}/lineitem", path);
+                Box::new(ParquetTable::try_new(&path)?)
+            }
+            other => {
+                println!("Invalid file format '{}'", other);
+                process::exit(-1);
+            }
+        };
+
+    if opt.mem_table {
+        println!("Loading data into memory");
+        let start = Instant::now();
+
+        let memtable = MemTable::load(tableprovider.as_ref(), 
opt.batch_size).await?;
+        println!(
+            "Loaded data into memory in {} ms",
+            start.elapsed().as_millis()
+        );
+
+        ctx.register_table("lineitem", Box::new(memtable));
+    } else {
+        ctx.register_table("lineitem", tableprovider);
     }
 
     let sql = match opt.query {
diff --git a/rust/datafusion/benches/sort_limit_query_sql.rs 
b/rust/datafusion/benches/sort_limit_query_sql.rs
index 76e0cae..1b2f162 100644
--- a/rust/datafusion/benches/sort_limit_query_sql.rs
+++ b/rust/datafusion/benches/sort_limit_query_sql.rs
@@ -73,7 +73,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
     let ctx_holder: Arc<Mutex<Vec<Arc<Mutex<ExecutionContext>>>>> =
         Arc::new(Mutex::new(vec![]));
     rt.block_on(async {
-        let mem_table = MemTable::load(&csv).await.unwrap();
+        let mem_table = MemTable::load(&csv, 16 * 1024).await.unwrap();
 
         // create local execution context
         let mut ctx = ExecutionContext::new();
diff --git a/rust/datafusion/src/datasource/memory.rs 
b/rust/datafusion/src/datasource/memory.rs
index 8ab22a3..de1c319 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -56,9 +56,9 @@ impl MemTable {
     }
 
     /// Create a mem table by reading from another data source
-    pub async fn load(t: &dyn TableProvider) -> Result<Self> {
+    pub async fn load(t: &dyn TableProvider, batch_size: usize) -> 
Result<Self> {
         let schema = t.schema();
-        let exec = t.scan(&None, 1024 * 1024)?;
+        let exec = t.scan(&None, batch_size)?;
 
         let mut data: Vec<Vec<RecordBatch>> =
             Vec::with_capacity(exec.output_partitioning().partition_count());

Reply via email to