This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 79c83b2 ARROW-10240: [Rust] Optionally load data into memory before
running benchmark query
79c83b2 is described below
commit 79c83b261d45810f1ac8acc165dd110875305da4
Author: Jörn Horstmann <[email protected]>
AuthorDate: Sat Oct 10 12:52:13 2020 -0600
ARROW-10240: [Rust] Optionally load data into memory before running
benchmark query
Closes #8409 from jhorstmann/ARROW-10240-load-data-into-memory-for-tpch
Authored-by: Jörn Horstmann <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
rust/benchmarks/src/bin/tpch.rs | 74 ++++++++++++++++---------
rust/datafusion/benches/sort_limit_query_sql.rs | 2 +-
rust/datafusion/src/datasource/memory.rs | 4 +-
3 files changed, 52 insertions(+), 28 deletions(-)
diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index 0fe5a1a..b76ae13 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -26,6 +26,8 @@ use arrow::util::pretty;
use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
+use datafusion::datasource::parquet::ParquetTable;
+use datafusion::datasource::{CsvFile, MemTable, TableProvider};
use datafusion::physical_plan::csv::CsvReadOptions;
use structopt::StructOpt;
@@ -59,6 +61,10 @@ struct TpchOpt {
/// File format: `csv` or `parquet`
#[structopt(short = "f", long = "format", default_value = "csv")]
file_format: String,
+
+ /// Load the data into a MemTable before executing the query
+ #[structopt(short = "m", long = "mem-table")]
+ mem_table: bool,
}
#[tokio::main]
@@ -73,31 +79,49 @@ async fn main() -> Result<()> {
let path = opt.path.to_str().unwrap();
- match opt.file_format.as_str() {
- // dbgen creates .tbl ('|' delimited) files
- "tbl" => {
- let path = format!("{}/lineitem.tbl", path);
- let schema = lineitem_schema();
- let options = CsvReadOptions::new()
- .schema(&schema)
- .delimiter(b'|')
- .file_extension(".tbl");
- ctx.register_csv("lineitem", &path, options)?
- }
- "csv" => {
- let path = format!("{}/lineitem", path);
- let schema = lineitem_schema();
- let options =
CsvReadOptions::new().schema(&schema).has_header(true);
- ctx.register_csv("lineitem", &path, options)?
- }
- "parquet" => {
- let path = format!("{}/lineitem", path);
- ctx.register_parquet("lineitem", &path)?
- }
- other => {
- println!("Invalid file format '{}'", other);
- process::exit(-1);
- }
+ let tableprovider: Box<dyn TableProvider + Send + Sync> =
+ match opt.file_format.as_str() {
+ // dbgen creates .tbl ('|' delimited) files
+ "tbl" => {
+ let path = format!("{}/lineitem.tbl", path);
+ let schema = lineitem_schema();
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .delimiter(b'|')
+ .file_extension(".tbl");
+
+ Box::new(CsvFile::try_new(&path, options)?)
+ }
+ "csv" => {
+ let path = format!("{}/lineitem", path);
+ let schema = lineitem_schema();
+ let options =
CsvReadOptions::new().schema(&schema).has_header(true);
+
+ Box::new(CsvFile::try_new(&path, options)?)
+ }
+ "parquet" => {
+ let path = format!("{}/lineitem", path);
+ Box::new(ParquetTable::try_new(&path)?)
+ }
+ other => {
+ println!("Invalid file format '{}'", other);
+ process::exit(-1);
+ }
+ };
+
+ if opt.mem_table {
+ println!("Loading data into memory");
+ let start = Instant::now();
+
+ let memtable = MemTable::load(tableprovider.as_ref(),
opt.batch_size).await?;
+ println!(
+ "Loaded data into memory in {} ms",
+ start.elapsed().as_millis()
+ );
+
+ ctx.register_table("lineitem", Box::new(memtable));
+ } else {
+ ctx.register_table("lineitem", tableprovider);
}
let sql = match opt.query {
diff --git a/rust/datafusion/benches/sort_limit_query_sql.rs
b/rust/datafusion/benches/sort_limit_query_sql.rs
index 76e0cae..1b2f162 100644
--- a/rust/datafusion/benches/sort_limit_query_sql.rs
+++ b/rust/datafusion/benches/sort_limit_query_sql.rs
@@ -73,7 +73,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
let ctx_holder: Arc<Mutex<Vec<Arc<Mutex<ExecutionContext>>>>> =
Arc::new(Mutex::new(vec![]));
rt.block_on(async {
- let mem_table = MemTable::load(&csv).await.unwrap();
+ let mem_table = MemTable::load(&csv, 16 * 1024).await.unwrap();
// create local execution context
let mut ctx = ExecutionContext::new();
diff --git a/rust/datafusion/src/datasource/memory.rs
b/rust/datafusion/src/datasource/memory.rs
index 8ab22a3..de1c319 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -56,9 +56,9 @@ impl MemTable {
}
/// Create a mem table by reading from another data source
- pub async fn load(t: &dyn TableProvider) -> Result<Self> {
+ pub async fn load(t: &dyn TableProvider, batch_size: usize) ->
Result<Self> {
let schema = t.schema();
- let exec = t.scan(&None, 1024 * 1024)?;
+ let exec = t.scan(&None, batch_size)?;
let mut data: Vec<Vec<RecordBatch>> =
Vec::with_capacity(exec.output_partitioning().partition_count());