This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 32951c3  Consolidate benchmarks (#34)
32951c3 is described below

commit 32951c3c1c62e0ceef5aca200e81d2ea9ea124a7
Author: Andy Grove <[email protected]>
AuthorDate: Fri Apr 23 13:34:40 2021 -0600

    Consolidate benchmarks (#34)
---
 .dockerignore                                      |   2 +
 Cargo.toml                                         |   1 -
 ballista/rust/benchmarks/tpch/Cargo.toml           |  36 -
 ballista/rust/benchmarks/tpch/README.md            | 104 ---
 ballista/rust/benchmarks/tpch/src/main.rs          | 360 --------
 .../benchmarks/tpch => benchmarks}/.dockerignore   |   0
 .../rust/benchmarks/tpch => benchmarks}/.gitignore |   0
 benchmarks/Cargo.toml                              |   1 +
 benchmarks/README.md                               | 106 ++-
 .../tpch => benchmarks}/docker-compose.yaml        |   8 +-
 .../benchmarks/tpch => benchmarks}/entrypoint.sh   |   0
 .../benchmarks/tpch => benchmarks}/queries/q1.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q10.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q11.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q12.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q13.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q14.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q16.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q17.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q18.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q19.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q2.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q20.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q21.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q22.sql |   0
 .../benchmarks/tpch => benchmarks}/queries/q3.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q4.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q5.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q6.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q7.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q8.sql  |   0
 .../benchmarks/tpch => benchmarks}/queries/q9.sql  |   0
 .../rust/benchmarks/tpch => benchmarks}/run.sh     |   1 +
 benchmarks/src/bin/tpch.rs                         | 950 +++------------------
 .../benchmarks/tpch => benchmarks}/tpch-gen.sh     |   2 +-
 .../tpch => benchmarks}/tpchgen.dockerfile         |   0
 dev/build-rust-base.sh                             |   2 +-
 dev/build-rust.sh                                  |   2 +-
 dev/docker/rust.dockerfile                         |   8 +-
 dev/integration-tests.sh                           |   4 +-
 dev/release/rat_exclude_files.txt                  |   2 +-
 41 files changed, 223 insertions(+), 1366 deletions(-)

diff --git a/.dockerignore b/.dockerignore
index 9a64a12..8cd6a89 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -23,4 +23,6 @@
 
 ci
 dev
+testing
+parquet-testing
 **/target/*
diff --git a/Cargo.toml b/Cargo.toml
index 0947bea..2f34bab 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,7 +20,6 @@ members = [
     "datafusion",
     "datafusion-examples",
        "benchmarks",
-    "ballista/rust/benchmarks/tpch",
     "ballista/rust/client",
     "ballista/rust/core",
     "ballista/rust/executor",
diff --git a/ballista/rust/benchmarks/tpch/Cargo.toml 
b/ballista/rust/benchmarks/tpch/Cargo.toml
deleted file mode 100644
index 9311f23..0000000
--- a/ballista/rust/benchmarks/tpch/Cargo.toml
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "tpch"
-version = "0.5.0-SNAPSHOT"
-homepage = "https://github.com/apache/arrow";
-repository = "https://github.com/apache/arrow";
-authors = ["Apache Arrow <[email protected]>"]
-license = "Apache-2.0"
-edition = "2018"
-
-[dependencies]
-ballista = { path="../../client" }
-datafusion = { path = "../../../../datafusion" }
-
-arrow = { git = "https://github.com/apache/arrow-rs";, rev = 
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
-parquet = { git = "https://github.com/apache/arrow-rs";, rev = 
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
-
-env_logger = "0.8"
-tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
-structopt = "0.3"
diff --git a/ballista/rust/benchmarks/tpch/README.md 
b/ballista/rust/benchmarks/tpch/README.md
deleted file mode 100644
index 20c4fc7..0000000
--- a/ballista/rust/benchmarks/tpch/README.md
+++ /dev/null
@@ -1,104 +0,0 @@
-<!---
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-
-# TPC-H Benchmarks
-
-TPC-H is an industry standard benchmark for testing databases and query 
engines. A command-line tool is available that
-can generate the raw test data at any given scale factor (scale factor refers 
to the amount of data to be generated).
-
-## Generating Test Data
-
-TPC-H data can be generated using the `tpch-gen.sh` script, which creates a 
Docker image containing the TPC-DS data
-generator.
-
-```bash
-./tpch-gen.sh
-```
-
-Data will be generated into the `data` subdirectory and will not be checked in 
because this directory has been added 
-to the `.gitignore` file.
-
-## Running the Benchmarks
-
-To run the benchmarks it is necessary to have at least one Ballista scheduler 
and one Ballista executor running.
-
-To run the scheduler from source:
-
-```bash
-cd $ARROW_HOME/ballista/rust/scheduler
-RUST_LOG=info cargo run --release
-```
-
-By default the scheduler will bind to `0.0.0.0` and listen on port 50050.
-
-To run the executor from source:
-
-```bash
-cd $ARROW_HOME/ballista/rust/executor
-RUST_LOG=info cargo run --release
-```
-
-By default the executor will bind to `0.0.0.0` and listen on port 50051.
-
-You can add SIMD/snmalloc/LTO flags to improve speed (with longer build times):
-
-```
-RUST_LOG=info RUSTFLAGS='-C target-cpu=native -C lto -C codegen-units=1 -C 
embed-bitcode' cargo run --release --bin executor --features "simd snmalloc" 
--target x86_64-unknown-linux-gnu
-```
-
-To run the benchmarks:
-
-```bash
-cd $ARROW_HOME/ballista/rust/benchmarks/tpch
-cargo run --release benchmark --host localhost --port 50050 --query 1 --path 
$(pwd)/data --format tbl
-```
-
-## Running the Benchmarks on docker-compose
-
-To start a Rust scheduler and executor using Docker Compose:
-
-```bash
-cd $BALLISTA_HOME
-./dev/build-rust.sh
-cd $BALLISTA_HOME/rust/benchmarks/tpch
-docker-compose up
-```
-
-Then you can run the benchmark with:
-
-```bash
-docker-compose run ballista-client cargo run benchmark --host 
ballista-scheduler --port 50050 --query 1 --path /data --format tbl
-```
-
-## Expected output
-
-The result of query 1 should produce the following output when executed 
against the SF=1 dataset.
-
-```
-+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
-| l_returnflag | l_linestatus | sum_qty  | sum_base_price     | sum_disc_price 
    | sum_charge         | avg_qty            | avg_price          | avg_disc   
          | count_order |
-+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
-| A            | F            | 37734107 | 56586554400.73001  | 
53758257134.870026 | 55909065222.82768  | 25.522005853257337 | 
38273.12973462168  | 0.049985295838396455 | 1478493     |
-| N            | F            | 991417   | 1487504710.3799996 | 
1413082168.0541    | 1469649223.1943746 | 25.516471920522985 | 
38284.467760848296 | 0.05009342667421622  | 38854       |
-| N            | O            | 74476023 | 111701708529.50996 | 
106118209986.10472 | 110367023144.56622 | 25.502229680934594 | 38249.1238377803 
  | 0.049996589476752576 | 2920373     |
-| R            | F            | 37719753 | 56568041380.90001  | 
53741292684.60399  | 55889619119.83194  | 25.50579361269077  | 
38250.854626099666 | 0.05000940583012587  | 1478870     |
-+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
-Query 1 iteration 0 took 1956.1 ms
-Query 1 avg time: 1956.11 ms
-```
diff --git a/ballista/rust/benchmarks/tpch/src/main.rs 
b/ballista/rust/benchmarks/tpch/src/main.rs
deleted file mode 100644
index 1ba46ea..0000000
--- a/ballista/rust/benchmarks/tpch/src/main.rs
+++ /dev/null
@@ -1,360 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
-//!
-//! This is a modified version of the DataFusion version of these benchmarks.
-
-use std::collections::HashMap;
-use std::fs;
-use std::path::{Path, PathBuf};
-use std::time::Instant;
-
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::util::pretty;
-use ballista::prelude::*;
-use datafusion::prelude::*;
-use parquet::basic::Compression;
-use parquet::file::properties::WriterProperties;
-use structopt::StructOpt;
-
-#[derive(Debug, StructOpt)]
-struct BenchmarkOpt {
-    /// Ballista executor host
-    #[structopt(long = "host")]
-    host: String,
-
-    /// Ballista executor port
-    #[structopt(long = "port")]
-    port: u16,
-
-    /// Query number
-    #[structopt(long)]
-    query: usize,
-
-    /// Activate debug mode to see query results
-    #[structopt(long)]
-    debug: bool,
-
-    /// Number of iterations of each test run
-    #[structopt(long = "iterations", default_value = "1")]
-    iterations: usize,
-
-    /// Batch size when reading CSV or Parquet files
-    #[structopt(long = "batch-size", default_value = "32768")]
-    batch_size: usize,
-
-    /// Path to data files
-    #[structopt(parse(from_os_str), required = true, long = "path")]
-    path: PathBuf,
-
-    /// File format: `csv`, `tbl` or `parquet`
-    #[structopt(long = "format")]
-    file_format: String,
-}
-
-#[derive(Debug, StructOpt)]
-struct ConvertOpt {
-    /// Path to csv files
-    #[structopt(parse(from_os_str), required = true, short = "i", long = 
"input")]
-    input_path: PathBuf,
-
-    /// Output path
-    #[structopt(parse(from_os_str), required = true, short = "o", long = 
"output")]
-    output_path: PathBuf,
-
-    /// Output file format: `csv` or `parquet`
-    #[structopt(short = "f", long = "format")]
-    file_format: String,
-
-    /// Compression to use when writing Parquet files
-    #[structopt(short = "c", long = "compression", default_value = "snappy")]
-    compression: String,
-
-    /// Number of partitions to produce
-    #[structopt(short = "p", long = "partitions", default_value = "1")]
-    partitions: usize,
-
-    /// Batch size when reading CSV or Parquet files
-    #[structopt(short = "s", long = "batch-size", default_value = "4096")]
-    batch_size: usize,
-}
-
-#[derive(Debug, StructOpt)]
-#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
-enum TpchOpt {
-    Benchmark(BenchmarkOpt),
-    Convert(ConvertOpt),
-}
-
-const TABLES: &[&str] = &[
-    "part", "supplier", "partsupp", "customer", "orders", "lineitem", 
"nation", "region",
-];
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    env_logger::init();
-    match TpchOpt::from_args() {
-        TpchOpt::Benchmark(opt) => benchmark(opt).await.map(|_| ()),
-        TpchOpt::Convert(opt) => convert_tbl(opt).await,
-    }
-}
-
-async fn benchmark(opt: BenchmarkOpt) -> Result<()> {
-    println!("Running benchmarks with the following options: {:?}", opt);
-
-    let mut settings = HashMap::new();
-    settings.insert("batch.size".to_owned(), format!("{}", opt.batch_size));
-
-    let ctx = BallistaContext::remote(opt.host.as_str(), opt.port, settings);
-
-    // register tables with Ballista context
-    let path = opt.path.to_str().unwrap();
-    let file_format = opt.file_format.as_str();
-    for table in TABLES {
-        match file_format {
-            // dbgen creates .tbl ('|' delimited) files without header
-            "tbl" => {
-                let path = format!("{}/{}.tbl", path, table);
-                let schema = get_schema(table);
-                let options = CsvReadOptions::new()
-                    .schema(&schema)
-                    .delimiter(b'|')
-                    .has_header(false)
-                    .file_extension(".tbl");
-                ctx.register_csv(table, &path, options)?;
-            }
-            "csv" => {
-                let path = format!("{}/{}", path, table);
-                let schema = get_schema(table);
-                let options = 
CsvReadOptions::new().schema(&schema).has_header(true);
-                ctx.register_csv(table, &path, options)?;
-            }
-            "parquet" => {
-                let path = format!("{}/{}", path, table);
-                ctx.register_parquet(table, &path)?;
-            }
-            other => {
-                unimplemented!("Invalid file format '{}'", other);
-            }
-        }
-    }
-
-    let mut millis = vec![];
-
-    // run benchmark
-    let sql = get_query_sql(opt.query)?;
-    println!("Running benchmark with query {}:\n {}", opt.query, sql);
-    for i in 0..opt.iterations {
-        let start = Instant::now();
-        let df = ctx.sql(&sql)?;
-        let mut batches = vec![];
-        let mut stream = df.collect().await?;
-        while let Some(result) = stream.next().await {
-            let batch = result?;
-            batches.push(batch);
-        }
-        let elapsed = start.elapsed().as_secs_f64() * 1000.0;
-        millis.push(elapsed as f64);
-        println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
-        if opt.debug {
-            pretty::print_batches(&batches)?;
-        }
-    }
-
-    let avg = millis.iter().sum::<f64>() / millis.len() as f64;
-    println!("Query {} avg time: {:.2} ms", opt.query, avg);
-
-    Ok(())
-}
-
-fn get_query_sql(query: usize) -> Result<String> {
-    if query > 0 && query < 23 {
-        let filename = format!("queries/q{}.sql", query);
-        Ok(fs::read_to_string(&filename).expect("failed to read query"))
-    } else {
-        Err(BallistaError::General(
-            "invalid query. Expected value between 1 and 22".to_owned(),
-        ))
-    }
-}
-
-async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
-    let output_root_path = Path::new(&opt.output_path);
-    for table in TABLES {
-        let start = Instant::now();
-        let schema = get_schema(table);
-
-        let input_path = format!("{}/{}.tbl", 
opt.input_path.to_str().unwrap(), table);
-        let options = CsvReadOptions::new()
-            .schema(&schema)
-            .delimiter(b'|')
-            .file_extension(".tbl");
-
-        let config = ExecutionConfig::new().with_batch_size(opt.batch_size);
-        let mut ctx = ExecutionContext::with_config(config);
-
-        // build plan to read the TBL file
-        let mut csv = ctx.read_csv(&input_path, options)?;
-
-        // optionally, repartition the file
-        if opt.partitions > 1 {
-            csv = 
csv.repartition(Partitioning::RoundRobinBatch(opt.partitions))?
-        }
-
-        // create the physical plan
-        let csv = csv.to_logical_plan();
-        let csv = ctx.optimize(&csv)?;
-        let csv = ctx.create_physical_plan(&csv)?;
-
-        let output_path = output_root_path.join(table);
-        let output_path = output_path.to_str().unwrap().to_owned();
-
-        println!(
-            "Converting '{}' to {} files in directory '{}'",
-            &input_path, &opt.file_format, &output_path
-        );
-        match opt.file_format.as_str() {
-            "csv" => ctx.write_csv(csv, output_path).await?,
-            "parquet" => {
-                let compression = match opt.compression.as_str() {
-                    "none" => Compression::UNCOMPRESSED,
-                    "snappy" => Compression::SNAPPY,
-                    "brotli" => Compression::BROTLI,
-                    "gzip" => Compression::GZIP,
-                    "lz4" => Compression::LZ4,
-                    "lz0" => Compression::LZO,
-                    "zstd" => Compression::ZSTD,
-                    other => {
-                        return Err(BallistaError::NotImplemented(format!(
-                            "Invalid compression format: {}",
-                            other
-                        )))
-                    }
-                };
-                let props = WriterProperties::builder()
-                    .set_compression(compression)
-                    .build();
-                ctx.write_parquet(csv, output_path, Some(props)).await?
-            }
-            other => {
-                return Err(BallistaError::NotImplemented(format!(
-                    "Invalid output format: {}",
-                    other
-                )))
-            }
-        }
-        println!("Conversion completed in {} ms", start.elapsed().as_millis());
-    }
-
-    Ok(())
-}
-
-fn get_schema(table: &str) -> Schema {
-    // note that the schema intentionally uses signed integers so that any 
generated Parquet
-    // files can also be used to benchmark tools that only support signed 
integers, such as
-    // Apache Spark
-
-    match table {
-        "part" => Schema::new(vec![
-            Field::new("p_partkey", DataType::Int32, false),
-            Field::new("p_name", DataType::Utf8, false),
-            Field::new("p_mfgr", DataType::Utf8, false),
-            Field::new("p_brand", DataType::Utf8, false),
-            Field::new("p_type", DataType::Utf8, false),
-            Field::new("p_size", DataType::Int32, false),
-            Field::new("p_container", DataType::Utf8, false),
-            Field::new("p_retailprice", DataType::Float64, false),
-            Field::new("p_comment", DataType::Utf8, false),
-        ]),
-
-        "supplier" => Schema::new(vec![
-            Field::new("s_suppkey", DataType::Int32, false),
-            Field::new("s_name", DataType::Utf8, false),
-            Field::new("s_address", DataType::Utf8, false),
-            Field::new("s_nationkey", DataType::Int32, false),
-            Field::new("s_phone", DataType::Utf8, false),
-            Field::new("s_acctbal", DataType::Float64, false),
-            Field::new("s_comment", DataType::Utf8, false),
-        ]),
-
-        "partsupp" => Schema::new(vec![
-            Field::new("ps_partkey", DataType::Int32, false),
-            Field::new("ps_suppkey", DataType::Int32, false),
-            Field::new("ps_availqty", DataType::Int32, false),
-            Field::new("ps_supplycost", DataType::Float64, false),
-            Field::new("ps_comment", DataType::Utf8, false),
-        ]),
-
-        "customer" => Schema::new(vec![
-            Field::new("c_custkey", DataType::Int32, false),
-            Field::new("c_name", DataType::Utf8, false),
-            Field::new("c_address", DataType::Utf8, false),
-            Field::new("c_nationkey", DataType::Int32, false),
-            Field::new("c_phone", DataType::Utf8, false),
-            Field::new("c_acctbal", DataType::Float64, false),
-            Field::new("c_mktsegment", DataType::Utf8, false),
-            Field::new("c_comment", DataType::Utf8, false),
-        ]),
-
-        "orders" => Schema::new(vec![
-            Field::new("o_orderkey", DataType::Int32, false),
-            Field::new("o_custkey", DataType::Int32, false),
-            Field::new("o_orderstatus", DataType::Utf8, false),
-            Field::new("o_totalprice", DataType::Float64, false),
-            Field::new("o_orderdate", DataType::Date32, false),
-            Field::new("o_orderpriority", DataType::Utf8, false),
-            Field::new("o_clerk", DataType::Utf8, false),
-            Field::new("o_shippriority", DataType::Int32, false),
-            Field::new("o_comment", DataType::Utf8, false),
-        ]),
-
-        "lineitem" => Schema::new(vec![
-            Field::new("l_orderkey", DataType::Int32, false),
-            Field::new("l_partkey", DataType::Int32, false),
-            Field::new("l_suppkey", DataType::Int32, false),
-            Field::new("l_linenumber", DataType::Int32, false),
-            Field::new("l_quantity", DataType::Float64, false),
-            Field::new("l_extendedprice", DataType::Float64, false),
-            Field::new("l_discount", DataType::Float64, false),
-            Field::new("l_tax", DataType::Float64, false),
-            Field::new("l_returnflag", DataType::Utf8, false),
-            Field::new("l_linestatus", DataType::Utf8, false),
-            Field::new("l_shipdate", DataType::Date32, false),
-            Field::new("l_commitdate", DataType::Date32, false),
-            Field::new("l_receiptdate", DataType::Date32, false),
-            Field::new("l_shipinstruct", DataType::Utf8, false),
-            Field::new("l_shipmode", DataType::Utf8, false),
-            Field::new("l_comment", DataType::Utf8, false),
-        ]),
-
-        "nation" => Schema::new(vec![
-            Field::new("n_nationkey", DataType::Int32, false),
-            Field::new("n_name", DataType::Utf8, false),
-            Field::new("n_regionkey", DataType::Int32, false),
-            Field::new("n_comment", DataType::Utf8, false),
-        ]),
-
-        "region" => Schema::new(vec![
-            Field::new("r_regionkey", DataType::Int32, false),
-            Field::new("r_name", DataType::Utf8, false),
-            Field::new("r_comment", DataType::Utf8, false),
-        ]),
-
-        _ => unimplemented!(),
-    }
-}
diff --git a/ballista/rust/benchmarks/tpch/.dockerignore 
b/benchmarks/.dockerignore
similarity index 100%
rename from ballista/rust/benchmarks/tpch/.dockerignore
rename to benchmarks/.dockerignore
diff --git a/ballista/rust/benchmarks/tpch/.gitignore b/benchmarks/.gitignore
similarity index 100%
rename from ballista/rust/benchmarks/tpch/.gitignore
rename to benchmarks/.gitignore
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 6eb6ab9..3562266 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,6 +34,7 @@ snmalloc = ["snmalloc-rs"]
 arrow = { git = "https://github.com/apache/arrow-rs";, rev = 
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
 parquet = { git = "https://github.com/apache/arrow-rs";, rev = 
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
 datafusion = { path = "../datafusion" }
+ballista = { path = "../ballista/rust/client" }
 structopt = { version = "0.3", default-features = false }
 tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
 futures = "0.3"
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 7460477..e003d96 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -17,53 +17,47 @@
   under the License.
 -->
 
-# Apache Arrow Rust Benchmarks
+# DataFusion and Ballista Benchmarks
 
 This crate contains benchmarks based on popular public data sets and open 
source benchmark suites, making it easy to
 run real-world benchmarks to help with performance and scalability testing and 
for comparing performance with other Arrow
 implementations as well as other query engines.
 
-Currently, only DataFusion benchmarks exist, but the plan is to add benchmarks 
for the arrow, flight, and parquet
-crates as well.
-
 ## Benchmark derived from TPC-H
 
 These benchmarks are derived from the [TPC-H][1] benchmark.
 
-Data for this benchmark can be generated using the [tpch-dbgen][2] 
command-line tool. Run the following commands to
-clone the repository and build the source code.
+## Generating Test Data
+
+TPC-H data can be generated using the `tpch-gen.sh` script, which creates a 
Docker image containing the TPC-DS data
+generator.
 
 ```bash
-git clone [email protected]:databricks/tpch-dbgen.git
-cd tpch-dbgen
-make
-export TPCH_DATA=$(pwd)
+./tpch-gen.sh
 ```
 
-Data can now be generated with the following command. Note that `-s 1` means 
use Scale Factor 1 or ~1 GB of
-data. This value can be increased to generate larger data sets.
+Data will be generated into the `data` subdirectory and will not be checked in 
because this directory has been added
+to the `.gitignore` file.
 
-```bash
-./dbgen -vf -s 1
-```
+## Running the DataFusion Benchmarks
 
-The benchmark can then be run (assuming the data created from `dbgen` is in 
`/mnt/tpch-dbgen`) with a command such as:
+The benchmark can then be run (assuming the data created from `dbgen` is in 
`./data`) with a command such as:
 
 ```bash
-cargo run --release --bin tpch -- benchmark --iterations 3 --path 
/mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
+cargo run --release --bin tpch -- benchmark --iterations 3 --path ./data 
--format tbl --query 1 --batch-size 4096
 ```
 
 You can enable the features `simd` (to use SIMD instructions) and/or 
`mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as 
features by passing them in as `--features`:
 
 ```
-cargo run --release --features "simd mimalloc" --bin tpch -- benchmark 
--iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
+cargo run --release --features "simd mimalloc" --bin tpch -- benchmark 
--iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
 ```
 
 The benchmark program also supports CSV and Parquet input file formats and a 
utility is provided to convert from `tbl`
 (generated by the `dbgen` utility) to CSV and Parquet.
 
 ```bash
-cargo run --release --bin tpch -- convert --input /mnt/tpch-dbgen --output 
/mnt/tpch-parquet --format parquet
+cargo run --release --bin tpch -- convert --input ./data --output 
/mnt/tpch-parquet --format parquet
 ```
 
 This utility does not yet provide support for changing the number of 
partitions when performing the conversion. Another
@@ -97,9 +91,78 @@ docker run -v /mnt:/mnt -it 
ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT \
   --partitions 64
 ```
 
+## Running the Ballista Benchmarks
+
+To run the benchmarks it is necessary to have at least one Ballista scheduler 
and one Ballista executor running.
+
+To run the scheduler from source:
+
+```bash
+cd $ARROW_HOME/ballista/rust/scheduler
+RUST_LOG=info cargo run --release
+```
+
+By default the scheduler will bind to `0.0.0.0` and listen on port 50050.
+
+To run the executor from source:
+
+```bash
+cd $ARROW_HOME/ballista/rust/executor
+RUST_LOG=info cargo run --release
+```
+
+By default the executor will bind to `0.0.0.0` and listen on port 50051.
+
+You can add SIMD/snmalloc/LTO flags to improve speed (with longer build times):
+
+```
+RUST_LOG=info RUSTFLAGS='-C target-cpu=native -C lto -C codegen-units=1 -C 
embed-bitcode' cargo run --release --bin executor --features "simd snmalloc" 
--target x86_64-unknown-linux-gnu
+```
+
+To run the benchmarks:
+
+```bash
+cd $ARROW_HOME/ballista/rust/benchmarks/tpch
+cargo run --release benchmark --host localhost --port 50050 --query 1 --path 
$(pwd)/data --format tbl
+```
+
+## Running the Ballista Benchmarks on docker-compose
+
+To start a Rust scheduler and executor using Docker Compose:
+
+```bash
+cd $BALLISTA_HOME
+./dev/build-rust.sh
+cd $BALLISTA_HOME/rust/benchmarks/tpch
+docker-compose up
+```
+
+Then you can run the benchmark with:
+
+```bash
+docker-compose run ballista-client cargo run benchmark --host 
ballista-scheduler --port 50050 --query 1 --path /data --format tbl
+```
+
+## Expected output
+
+The result of query 1 should produce the following output when executed 
against the SF=1 dataset.
+
+```
++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
+| l_returnflag | l_linestatus | sum_qty  | sum_base_price     | sum_disc_price 
    | sum_charge         | avg_qty            | avg_price          | avg_disc   
          | count_order |
++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
+| A            | F            | 37734107 | 56586554400.73001  | 
53758257134.870026 | 55909065222.82768  | 25.522005853257337 | 
38273.12973462168  | 0.049985295838396455 | 1478493     |
+| N            | F            | 991417   | 1487504710.3799996 | 
1413082168.0541    | 1469649223.1943746 | 25.516471920522985 | 
38284.467760848296 | 0.05009342667421622  | 38854       |
+| N            | O            | 74476023 | 111701708529.50996 | 
106118209986.10472 | 110367023144.56622 | 25.502229680934594 | 38249.1238377803 
  | 0.049996589476752576 | 2920373     |
+| R            | F            | 37719753 | 56568041380.90001  | 
53741292684.60399  | 55889619119.83194  | 25.50579361269077  | 
38250.854626099666 | 0.05000940583012587  | 1478870     |
++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
+Query 1 iteration 0 took 1956.1 ms
+Query 1 avg time: 1956.11 ms
+```
+
 ## NYC Taxi Benchmark
 
-These benchmarks are based on the [New York Taxi and Limousine Commission][3] 
data set.
+These benchmarks are based on the [New York Taxi and Limousine Commission][2] 
data set.
 
 ```bash
 cargo run --release --bin nyctaxi -- --iterations 3 --path /mnt/nyctaxi/csv 
--format csv --batch-size 4096
@@ -116,5 +179,4 @@ Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
 ```
 
 [1]: http://www.tpc.org/tpch/
-[2]: https://github.com/databricks/tpch-dbgen
-[3]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
+[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
diff --git a/ballista/rust/benchmarks/tpch/docker-compose.yaml 
b/benchmarks/docker-compose.yaml
similarity index 90%
rename from ballista/rust/benchmarks/tpch/docker-compose.yaml
rename to benchmarks/docker-compose.yaml
index f872ce1..6015dba 100644
--- a/ballista/rust/benchmarks/tpch/docker-compose.yaml
+++ b/benchmarks/docker-compose.yaml
@@ -20,7 +20,7 @@ services:
     image: quay.io/coreos/etcd:v3.4.9
     command: "etcd -advertise-client-urls http://etcd:2379 -listen-client-urls 
http://0.0.0.0:2379";
   ballista-scheduler:
-    image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+    image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
     command: "/scheduler --config-backend etcd --etcd-urls etcd:2379 
--bind-host 0.0.0.0 --port 50050"
     environment:
       - RUST_LOG=ballista=debug
@@ -29,7 +29,7 @@ services:
     depends_on:
       - etcd
   ballista-executor-1:
-    image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+    image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
     command: "/executor --bind-host 0.0.0.0 --port 50051 --external-host 
ballista-executor-1 --scheduler-host ballista-scheduler"
     environment:
       - RUST_LOG=info
@@ -38,7 +38,7 @@ services:
     depends_on:
       - ballista-scheduler
   ballista-executor-2:
-    image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+    image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
     command: "/executor --bind-host 0.0.0.0 --port 50052 --external-host 
ballista-executor-2 --scheduler-host ballista-scheduler"
     environment:
       - RUST_LOG=info
@@ -47,7 +47,7 @@ services:
     depends_on:
       - ballista-scheduler
   ballista-client:
-    image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+    image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
     command: "/bin/sh" # do nothing
     working_dir: /ballista/benchmarks/tpch
     environment:
diff --git a/ballista/rust/benchmarks/tpch/entrypoint.sh 
b/benchmarks/entrypoint.sh
similarity index 100%
rename from ballista/rust/benchmarks/tpch/entrypoint.sh
rename to benchmarks/entrypoint.sh
diff --git a/ballista/rust/benchmarks/tpch/queries/q1.sql 
b/benchmarks/queries/q1.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q1.sql
rename to benchmarks/queries/q1.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q10.sql 
b/benchmarks/queries/q10.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q10.sql
rename to benchmarks/queries/q10.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q11.sql 
b/benchmarks/queries/q11.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q11.sql
rename to benchmarks/queries/q11.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q12.sql 
b/benchmarks/queries/q12.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q12.sql
rename to benchmarks/queries/q12.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q13.sql 
b/benchmarks/queries/q13.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q13.sql
rename to benchmarks/queries/q13.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q14.sql 
b/benchmarks/queries/q14.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q14.sql
rename to benchmarks/queries/q14.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q16.sql 
b/benchmarks/queries/q16.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q16.sql
rename to benchmarks/queries/q16.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q17.sql 
b/benchmarks/queries/q17.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q17.sql
rename to benchmarks/queries/q17.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q18.sql 
b/benchmarks/queries/q18.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q18.sql
rename to benchmarks/queries/q18.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q19.sql 
b/benchmarks/queries/q19.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q19.sql
rename to benchmarks/queries/q19.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q2.sql 
b/benchmarks/queries/q2.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q2.sql
rename to benchmarks/queries/q2.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q20.sql 
b/benchmarks/queries/q20.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q20.sql
rename to benchmarks/queries/q20.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q21.sql 
b/benchmarks/queries/q21.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q21.sql
rename to benchmarks/queries/q21.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q22.sql 
b/benchmarks/queries/q22.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q22.sql
rename to benchmarks/queries/q22.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q3.sql 
b/benchmarks/queries/q3.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q3.sql
rename to benchmarks/queries/q3.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q4.sql 
b/benchmarks/queries/q4.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q4.sql
rename to benchmarks/queries/q4.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q5.sql 
b/benchmarks/queries/q5.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q5.sql
rename to benchmarks/queries/q5.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q6.sql 
b/benchmarks/queries/q6.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q6.sql
rename to benchmarks/queries/q6.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q7.sql 
b/benchmarks/queries/q7.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q7.sql
rename to benchmarks/queries/q7.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q8.sql 
b/benchmarks/queries/q8.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q8.sql
rename to benchmarks/queries/q8.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q9.sql 
b/benchmarks/queries/q9.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q9.sql
rename to benchmarks/queries/q9.sql
diff --git a/ballista/rust/benchmarks/tpch/run.sh b/benchmarks/run.sh
similarity index 99%
rename from ballista/rust/benchmarks/tpch/run.sh
rename to benchmarks/run.sh
index c8a36b6..fd97ff9 100755
--- a/ballista/rust/benchmarks/tpch/run.sh
+++ b/benchmarks/run.sh
@@ -19,6 +19,7 @@ set -e
 
 # This bash script is meant to be run inside the docker-compose environment. 
Check the README for instructions
 
+cd /
 for query in 1 3 5 6 10 12
 do
   /tpch benchmark --host ballista-scheduler --port 50050 --query $query --path 
/data --format tbl --iterations 1 --debug
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index b203ceb..fd9f052 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -17,21 +17,26 @@
 
 //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
 
-use std::time::Instant;
 use std::{
+    collections::HashMap,
+    fs,
+    iter::Iterator,
     path::{Path, PathBuf},
     sync::Arc,
+    time::Instant,
 };
 
+use futures::StreamExt;
+
 use arrow::datatypes::{DataType, Field, Schema};
 use arrow::util::pretty;
+use ballista::context::BallistaContext;
 use datafusion::datasource::parquet::ParquetTable;
 use datafusion::datasource::{CsvFile, MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::collect;
 use datafusion::prelude::*;
-
 use parquet::basic::Compression;
 use parquet::file::properties::WriterProperties;
 use structopt::StructOpt;
@@ -44,7 +49,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 #[global_allocator]
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
-#[derive(Debug, StructOpt)]
+#[derive(Debug, StructOpt, Clone)]
 struct BenchmarkOpt {
     /// Query number
     #[structopt(short, long)]
@@ -81,6 +86,14 @@ struct BenchmarkOpt {
     /// Number of partitions to create when using MemTable as input
     #[structopt(short = "n", long = "partitions", default_value = "8")]
     partitions: usize,
+
+    /// Ballista executor host
+    #[structopt(long = "host")]
+    host: Option<String>,
+
+    /// Ballista executor port
+    #[structopt(long = "port")]
+    port: Option<u16>,
 }
 
 #[derive(Debug, StructOpt)]
@@ -125,12 +138,20 @@ const TABLES: &[&str] = &[
 async fn main() -> Result<()> {
     env_logger::init();
     match TpchOpt::from_args() {
-        TpchOpt::Benchmark(opt) => benchmark(opt).await.map(|_| ()),
+        TpchOpt::Benchmark(opt) => {
+            if opt.host.is_some() && opt.port.is_some() {
+                benchmark_ballista(opt).await.map(|_| ())
+            } else {
+                benchmark_datafusion(opt).await.map(|_| ())
+            }
+        }
         TpchOpt::Convert(opt) => convert_tbl(opt).await,
     }
 }
 
-async fn benchmark(opt: BenchmarkOpt) -> 
Result<Vec<arrow::record_batch::RecordBatch>> {
+async fn benchmark_datafusion(
+    opt: BenchmarkOpt,
+) -> Result<Vec<arrow::record_batch::RecordBatch>> {
     println!("Running benchmarks with the following options: {:?}", opt);
     let config = ExecutionConfig::new()
         .with_concurrency(opt.concurrency)
@@ -181,832 +202,97 @@ async fn benchmark(opt: BenchmarkOpt) -> 
Result<Vec<arrow::record_batch::RecordB
     Ok(result)
 }
 
-fn create_logical_plan(ctx: &mut ExecutionContext, query: usize) -> 
Result<LogicalPlan> {
-    match query {
-        // original
-        // 1 => ctx.create_logical_plan(
-        //     "select
-        //         l_returnflag,
-        //         l_linestatus,
-        //         sum(l_quantity) as sum_qty,
-        //         sum(l_extendedprice) as sum_base_price,
-        //         sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
-        //         sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as 
sum_charge,
-        //         avg(l_quantity) as avg_qty,
-        //         avg(l_extendedprice) as avg_price,
-        //         avg(l_discount) as avg_disc,
-        //         count(*) as count_order
-        //     from
-        //         lineitem
-        //     where
-        //         l_shipdate <= date '1998-12-01' - interval '90' day (3)
-        //     group by
-        //         l_returnflag,
-        //         l_linestatus
-        //     order by
-        //         l_returnflag,
-        //         l_linestatus;"
-        // ),
-        1 => ctx.create_logical_plan(
-            "select
-                l_returnflag,
-                l_linestatus,
-                sum(l_quantity) as sum_qty,
-                sum(l_extendedprice) as sum_base_price,
-                sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
-                sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as 
sum_charge,
-                avg(l_quantity) as avg_qty,
-                avg(l_extendedprice) as avg_price,
-                avg(l_discount) as avg_disc,
-                count(*) as count_order
-            from
-                lineitem
-            where
-                l_shipdate <= date '1998-09-02'
-            group by
-                l_returnflag,
-                l_linestatus
-            order by
-                l_returnflag,
-                l_linestatus;",
-        ),
-
-        2 => ctx.create_logical_plan(
-            "select
-                s_acctbal,
-                s_name,
-                n_name,
-                p_partkey,
-                p_mfgr,
-                s_address,
-                s_phone,
-                s_comment
-            from
-                part,
-                supplier,
-                partsupp,
-                nation,
-                region
-            where
-                p_partkey = ps_partkey
-                and s_suppkey = ps_suppkey
-                and p_size = 15
-                and p_type like '%BRASS'
-                and s_nationkey = n_nationkey
-                and n_regionkey = r_regionkey
-                and r_name = 'EUROPE'
-                and ps_supplycost = (
-                    select
-                        min(ps_supplycost)
-                    from
-                        partsupp,
-                        supplier,
-                        nation,
-                        region
-                    where
-                        p_partkey = ps_partkey
-                        and s_suppkey = ps_suppkey
-                        and s_nationkey = n_nationkey
-                        and n_regionkey = r_regionkey
-                        and r_name = 'EUROPE'
-                )
-            order by
-                s_acctbal desc,
-                n_name,
-                s_name,
-                p_partkey;",
-        ),
-
-        3 => ctx.create_logical_plan(
-            "select
-                l_orderkey,
-                sum(l_extendedprice * (1 - l_discount)) as revenue,
-                o_orderdate,
-                o_shippriority
-            from
-                customer,
-                orders,
-                lineitem
-            where
-                c_mktsegment = 'BUILDING'
-                and c_custkey = o_custkey
-                and l_orderkey = o_orderkey
-                and o_orderdate < date '1995-03-15'
-                and l_shipdate > date '1995-03-15'
-            group by
-                l_orderkey,
-                o_orderdate,
-                o_shippriority
-            order by
-                revenue desc,
-                o_orderdate;",
-        ),
-
-        4 => ctx.create_logical_plan(
-            "select
-                o_orderpriority,
-                count(*) as order_count
-            from
-                orders
-            where
-                o_orderdate >= '1993-07-01'
-                and o_orderdate < date '1993-07-01' + interval '3' month
-                and exists (
-                    select
-                        *
-                    from
-                        lineitem
-                    where
-                        l_orderkey = o_orderkey
-                        and l_commitdate < l_receiptdate
-                )
-            group by
-                o_orderpriority
-            order by
-                o_orderpriority;",
-        ),
-
-        // original
-        // 5 => ctx.create_logical_plan(
-        //     "select
-        //         n_name,
-        //         sum(l_extendedprice * (1 - l_discount)) as revenue
-        //     from
-        //         customer,
-        //         orders,
-        //         lineitem,
-        //         supplier,
-        //         nation,
-        //         region
-        //     where
-        //         c_custkey = o_custkey
-        //         and l_orderkey = o_orderkey
-        //         and l_suppkey = s_suppkey
-        //         and c_nationkey = s_nationkey
-        //         and s_nationkey = n_nationkey
-        //         and n_regionkey = r_regionkey
-        //         and r_name = 'ASIA'
-        //         and o_orderdate >= date '1994-01-01'
-        //         and o_orderdate < date '1994-01-01' + interval '1' year
-        //     group by
-        //         n_name
-        //     order by
-        //         revenue desc;"
-        // ),
-        5 => ctx.create_logical_plan(
-            "select
-                n_name,
-                sum(l_extendedprice * (1 - l_discount)) as revenue
-            from
-                customer,
-                orders,
-                lineitem,
-                supplier,
-                nation,
-                region
-            where
-                c_custkey = o_custkey
-                and l_orderkey = o_orderkey
-                and l_suppkey = s_suppkey
-                and c_nationkey = s_nationkey
-                and s_nationkey = n_nationkey
-                and n_regionkey = r_regionkey
-                and r_name = 'ASIA'
-                and o_orderdate >= date '1994-01-01'
-                and o_orderdate < date '1995-01-01'
-            group by
-                n_name
-            order by
-                revenue desc;",
-        ),
-
-        // original
-        // 6 => ctx.create_logical_plan(
-        //     "select
-        //         sum(l_extendedprice * l_discount) as revenue
-        //     from
-        //         lineitem
-        //     where
-        //         l_shipdate >= date '1994-01-01'
-        //         and l_shipdate < date '1994-01-01' + interval '1' year
-        //         and l_discount between .06 - 0.01 and .06 + 0.01
-        //         and l_quantity < 24;"
-        // ),
-        6 => ctx.create_logical_plan(
-            "select
-                sum(l_extendedprice * l_discount) as revenue
-            from
-                lineitem
-            where
-                l_shipdate >= date '1994-01-01'
-                and l_shipdate < date '1995-01-01'
-                and l_discount between .06 - 0.01 and .06 + 0.01
-                and l_quantity < 24;",
-        ),
-
-        7 => ctx.create_logical_plan(
-            "select
-                supp_nation,
-                cust_nation,
-                l_year,
-                sum(volume) as revenue
-            from
-                (
-                    select
-                        n1.n_name as supp_nation,
-                        n2.n_name as cust_nation,
-                        extract(year from l_shipdate) as l_year,
-                        l_extendedprice * (1 - l_discount) as volume
-                    from
-                        supplier,
-                        lineitem,
-                        orders,
-                        customer,
-                        nation n1,
-                        nation n2
-                    where
-                        s_suppkey = l_suppkey
-                        and o_orderkey = l_orderkey
-                        and c_custkey = o_custkey
-                        and s_nationkey = n1.n_nationkey
-                        and c_nationkey = n2.n_nationkey
-                        and (
-                            (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY')
-                            or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE')
-                        )
-                        and l_shipdate between date '1995-01-01' and date 
'1996-12-31'
-                ) as shipping
-            group by
-                supp_nation,
-                cust_nation,
-                l_year
-            order by
-                supp_nation,
-                cust_nation,
-                l_year;",
-        ),
-
-        8 => ctx.create_logical_plan(
-            "select
-                o_year,
-                sum(case
-                    when nation = 'BRAZIL' then volume
-                    else 0
-                end) / sum(volume) as mkt_share
-            from
-                (
-                    select
-                        extract(year from o_orderdate) as o_year,
-                        l_extendedprice * (1 - l_discount) as volume,
-                        n2.n_name as nation
-                    from
-                        part,
-                        supplier,
-                        lineitem,
-                        orders,
-                        customer,
-                        nation n1,
-                        nation n2,
-                        region
-                    where
-                        p_partkey = l_partkey
-                        and s_suppkey = l_suppkey
-                        and l_orderkey = o_orderkey
-                        and o_custkey = c_custkey
-                        and c_nationkey = n1.n_nationkey
-                        and n1.n_regionkey = r_regionkey
-                        and r_name = 'AMERICA'
-                        and s_nationkey = n2.n_nationkey
-                        and o_orderdate between date '1995-01-01' and date 
'1996-12-31'
-                        and p_type = 'ECONOMY ANODIZED STEEL'
-                ) as all_nations
-            group by
-                o_year
-            order by
-                o_year;",
-        ),
-
-        9 => ctx.create_logical_plan(
-            "select
-                nation,
-                o_year,
-                sum(amount) as sum_profit
-            from
-                (
-                    select
-                        n_name as nation,
-                        extract(year from o_orderdate) as o_year,
-                        l_extendedprice * (1 - l_discount) - ps_supplycost * 
l_quantity as amount
-                    from
-                        part,
-                        supplier,
-                        lineitem,
-                        partsupp,
-                        orders,
-                        nation
-                    where
-                        s_suppkey = l_suppkey
-                        and ps_suppkey = l_suppkey
-                        and ps_partkey = l_partkey
-                        and p_partkey = l_partkey
-                        and o_orderkey = l_orderkey
-                        and s_nationkey = n_nationkey
-                        and p_name like '%green%'
-                ) as profit
-            group by
-                nation,
-                o_year
-            order by
-                nation,
-                o_year desc;",
-        ),
-
-        // 10 => ctx.create_logical_plan(
-        //     "select
-        //         c_custkey,
-        //         c_name,
-        //         sum(l_extendedprice * (1 - l_discount)) as revenue,
-        //         c_acctbal,
-        //         n_name,
-        //         c_address,
-        //         c_phone,
-        //         c_comment
-        //     from
-        //         customer,
-        //         orders,
-        //         lineitem,
-        //         nation
-        //     where
-        //         c_custkey = o_custkey
-        //         and l_orderkey = o_orderkey
-        //         and o_orderdate >= date '1993-10-01'
-        //         and o_orderdate < date '1993-10-01' + interval '3' month
-        //         and l_returnflag = 'R'
-        //         and c_nationkey = n_nationkey
-        //     group by
-        //         c_custkey,
-        //         c_name,
-        //         c_acctbal,
-        //         c_phone,
-        //         n_name,
-        //         c_address,
-        //         c_comment
-        //     order by
-        //         revenue desc;"
-        // ),
-        10 => ctx.create_logical_plan(
-            "select
-                c_custkey,
-                c_name,
-                sum(l_extendedprice * (1 - l_discount)) as revenue,
-                c_acctbal,
-                n_name,
-                c_address,
-                c_phone,
-                c_comment
-            from
-                customer,
-                orders,
-                lineitem,
-                nation
-            where
-                c_custkey = o_custkey
-                and l_orderkey = o_orderkey
-                and o_orderdate >= date '1993-10-01'
-                and o_orderdate < date '1994-01-01'
-                and l_returnflag = 'R'
-                and c_nationkey = n_nationkey
-            group by
-                c_custkey,
-                c_name,
-                c_acctbal,
-                c_phone,
-                n_name,
-                c_address,
-                c_comment
-            order by
-                revenue desc;",
-        ),
-
-        11 => ctx.create_logical_plan(
-            "select
-                ps_partkey,
-                sum(ps_supplycost * ps_availqty) as value
-            from
-                partsupp,
-                supplier,
-                nation
-            where
-                ps_suppkey = s_suppkey
-                and s_nationkey = n_nationkey
-                and n_name = 'GERMANY'
-            group by
-                ps_partkey having
-                    sum(ps_supplycost * ps_availqty) > (
-                        select
-                            sum(ps_supplycost * ps_availqty) * 0.0001
-                        from
-                            partsupp,
-                            supplier,
-                            nation
-                        where
-                            ps_suppkey = s_suppkey
-                            and s_nationkey = n_nationkey
-                            and n_name = 'GERMANY'
-                    )
-            order by
-                value desc;",
-        ),
-
-        // original
-        // 12 => ctx.create_logical_plan(
-        //     "select
-        //         l_shipmode,
-        //         sum(case
-        //             when o_orderpriority = '1-URGENT'
-        //                 or o_orderpriority = '2-HIGH'
-        //                 then 1
-        //             else 0
-        //         end) as high_line_count,
-        //         sum(case
-        //             when o_orderpriority <> '1-URGENT'
-        //                 and o_orderpriority <> '2-HIGH'
-        //                 then 1
-        //             else 0
-        //         end) as low_line_count
-        //     from
-        //         orders,
-        //         lineitem
-        //     where
-        //         o_orderkey = l_orderkey
-        //         and l_shipmode in ('MAIL', 'SHIP')
-        //         and l_commitdate < l_receiptdate
-        //         and l_shipdate < l_commitdate
-        //         and l_receiptdate >= date '1994-01-01'
-        //         and l_receiptdate < date '1994-01-01' + interval '1' year
-        //     group by
-        //         l_shipmode
-        //     order by
-        //         l_shipmode;"
-        // ),
-        12 => ctx.create_logical_plan(
-            "select
-                l_shipmode,
-                sum(case
-                    when o_orderpriority = '1-URGENT'
-                        or o_orderpriority = '2-HIGH'
-                        then 1
-                    else 0
-                end) as high_line_count,
-                sum(case
-                    when o_orderpriority <> '1-URGENT'
-                        and o_orderpriority <> '2-HIGH'
-                        then 1
-                    else 0
-                end) as low_line_count
-            from
-                lineitem
-            join
-                orders
-            on
-                l_orderkey = o_orderkey
-            where
-                l_shipmode in ('MAIL', 'SHIP')
-                and l_commitdate < l_receiptdate
-                and l_shipdate < l_commitdate
-                and l_receiptdate >= date '1994-01-01'
-                and l_receiptdate < date '1995-01-01'
-            group by
-                l_shipmode
-            order by
-                l_shipmode;",
-        ),
-
-        13 => ctx.create_logical_plan(
-            "select
-                c_count,
-                count(*) as custdist
-            from
-                (
-                    select
-                        c_custkey,
-                        count(o_orderkey)
-                    from
-                        customer left outer join orders on
-                            c_custkey = o_custkey
-                            and o_comment not like '%special%requests%'
-                    group by
-                        c_custkey
-                ) as c_orders (c_custkey, c_count)
-            group by
-                c_count
-            order by
-                custdist desc,
-                c_count desc;",
-        ),
-
-        14 => ctx.create_logical_plan(
-            "select
-                100.00 * sum(case
-                    when p_type like 'PROMO%'
-                        then l_extendedprice * (1 - l_discount)
-                    else 0
-                end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
-            from
-                lineitem,
-                part
-            where
-                l_partkey = p_partkey
-                and l_shipdate >= date '1995-09-01'
-                and l_shipdate < date '1995-10-01';",
-        ),
-
-        15 => ctx.create_logical_plan(
-            "create view revenue0 (supplier_no, total_revenue) as
-                select
-                    l_suppkey,
-                    sum(l_extendedprice * (1 - l_discount))
-                from
-                    lineitem
-                where
-                    l_shipdate >= date '1996-01-01'
-                    and l_shipdate < date '1996-01-01' + interval '3' month
-                group by
-                    l_suppkey;
-
-            select
-                s_suppkey,
-                s_name,
-                s_address,
-                s_phone,
-                total_revenue
-            from
-                supplier,
-                revenue0
-            where
-                s_suppkey = supplier_no
-                and total_revenue = (
-                    select
-                        max(total_revenue)
-                    from
-                        revenue0
-                )
-            order by
-                s_suppkey;
-
-            drop view revenue0;",
-        ),
-
-        16 => ctx.create_logical_plan(
-            "select
-                p_brand,
-                p_type,
-                p_size,
-                count(distinct ps_suppkey) as supplier_cnt
-            from
-                partsupp,
-                part
-            where
-                p_partkey = ps_partkey
-                and p_brand <> 'Brand#45'
-                and p_type not like 'MEDIUM POLISHED%'
-                and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
-                and ps_suppkey not in (
-                    select
-                        s_suppkey
-                    from
-                        supplier
-                    where
-                        s_comment like '%Customer%Complaints%'
-                )
-            group by
-                p_brand,
-                p_type,
-                p_size
-            order by
-                supplier_cnt desc,
-                p_brand,
-                p_type,
-                p_size;",
-        ),
-
-        17 => ctx.create_logical_plan(
-            "select
-                sum(l_extendedprice) / 7.0 as avg_yearly
-            from
-                lineitem,
-                part
-            where
-                p_partkey = l_partkey
-                and p_brand = 'Brand#23'
-                and p_container = 'MED BOX'
-                and l_quantity < (
-                    select
-                        0.2 * avg(l_quantity)
-                    from
-                        lineitem
-                    where
-                        l_partkey = p_partkey
-                );",
-        ),
-
-        18 => ctx.create_logical_plan(
-            "select
-                c_name,
-                c_custkey,
-                o_orderkey,
-                o_orderdate,
-                o_totalprice,
-                sum(l_quantity)
-            from
-                customer,
-                orders,
-                lineitem
-            where
-                o_orderkey in (
-                    select
-                        l_orderkey
-                    from
-                        lineitem
-                    group by
-                        l_orderkey having
-                            sum(l_quantity) > 300
-                )
-                and c_custkey = o_custkey
-                and o_orderkey = l_orderkey
-            group by
-                c_name,
-                c_custkey,
-                o_orderkey,
-                o_orderdate,
-                o_totalprice
-            order by
-                o_totalprice desc,
-                o_orderdate;",
-        ),
-
-        19 => ctx.create_logical_plan(
-            "select
-                sum(l_extendedprice* (1 - l_discount)) as revenue
-            from
-                lineitem,
-                part
-            where
-                (
-                    p_partkey = l_partkey
-                    and p_brand = 'Brand#12'
-                    and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM 
PKG')
-                    and l_quantity >= 1 and l_quantity <= 1 + 10
-                    and p_size between 1 and 5
-                    and l_shipmode in ('AIR', 'AIR REG')
-                    and l_shipinstruct = 'DELIVER IN PERSON'
-                )
-                or
-                (
-                    p_partkey = l_partkey
-                    and p_brand = 'Brand#23'
-                    and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED 
PACK')
-                    and l_quantity >= 10 and l_quantity <= 10 + 10
-                    and p_size between 1 and 10
-                    and l_shipmode in ('AIR', 'AIR REG')
-                    and l_shipinstruct = 'DELIVER IN PERSON'
-                )
-                or
-                (
-                    p_partkey = l_partkey
-                    and p_brand = 'Brand#34'
-                    and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG 
PKG')
-                    and l_quantity >= 20 and l_quantity <= 20 + 10
-                    and p_size between 1 and 15
-                    and l_shipmode in ('AIR', 'AIR REG')
-                    and l_shipinstruct = 'DELIVER IN PERSON'
-                );",
-        ),
-
-        20 => ctx.create_logical_plan(
-            "select
-                s_name,
-                s_address
-            from
-                supplier,
-                nation
-            where
-                s_suppkey in (
-                    select
-                        ps_suppkey
-                    from
-                        partsupp
-                    where
-                        ps_partkey in (
-                            select
-                                p_partkey
-                            from
-                                part
-                            where
-                                p_name like 'forest%'
-                        )
-                        and ps_availqty > (
-                            select
-                                0.5 * sum(l_quantity)
-                            from
-                                lineitem
-                            where
-                                l_partkey = ps_partkey
-                                and l_suppkey = ps_suppkey
-                                and l_shipdate >= date '1994-01-01'
-                                and l_shipdate < 'date 1994-01-01' + interval 
'1' year
-                        )
-                )
-                and s_nationkey = n_nationkey
-                and n_name = 'CANADA'
-            order by
-                s_name;",
-        ),
-
-        21 => ctx.create_logical_plan(
-            "select
-                s_name,
-                count(*) as numwait
-            from
-                supplier,
-                lineitem l1,
-                orders,
-                nation
-            where
-                s_suppkey = l1.l_suppkey
-                and o_orderkey = l1.l_orderkey
-                and o_orderstatus = 'F'
-                and l1.l_receiptdate > l1.l_commitdate
-                and exists (
-                    select
-                        *
-                    from
-                        lineitem l2
-                    where
-                        l2.l_orderkey = l1.l_orderkey
-                        and l2.l_suppkey <> l1.l_suppkey
-                )
-                and not exists (
-                    select
-                        *
-                    from
-                        lineitem l3
-                    where
-                        l3.l_orderkey = l1.l_orderkey
-                        and l3.l_suppkey <> l1.l_suppkey
-                        and l3.l_receiptdate > l3.l_commitdate
-                )
-                and s_nationkey = n_nationkey
-                and n_name = 'SAUDI ARABIA'
-            group by
-                s_name
-            order by
-                numwait desc,
-                s_name;",
-        ),
-
-        22 => ctx.create_logical_plan(
-            "select
-                cntrycode,
-                count(*) as numcust,
-                sum(c_acctbal) as totacctbal
-            from
-                (
-                    select
-                        substring(c_phone from 1 for 2) as cntrycode,
-                        c_acctbal
-                    from
-                        customer
-                    where
-                        substring(c_phone from 1 for 2) in
-                            ('13', '31', '23', '29', '30', '18', '17')
-                        and c_acctbal > (
-                            select
-                                avg(c_acctbal)
-                            from
-                                customer
-                            where
-                                c_acctbal > 0.00
-                                and substring(c_phone from 1 for 2) in
-                                    ('13', '31', '23', '29', '30', '18', '17')
-                        )
-                        and not exists (
-                            select
-                                *
-                            from
-                                orders
-                            where
-                                o_custkey = c_custkey
-                        )
-                ) as custsale
-            group by
-                cntrycode
-            order by
-                cntrycode;",
-        ),
-
-        _ => unimplemented!("invalid query. Expected value between 1 and 22"),
+async fn benchmark_ballista(opt: BenchmarkOpt) -> Result<()> {
+    println!("Running benchmarks with the following options: {:?}", opt);
+
+    let mut settings = HashMap::new();
+    settings.insert("batch.size".to_owned(), format!("{}", opt.batch_size));
+
+    let ctx =
+        BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), 
settings);
+
+    // register tables with Ballista context
+    let path = opt.path.to_str().unwrap();
+    let file_format = opt.file_format.as_str();
+    for table in TABLES {
+        match file_format {
+            // dbgen creates .tbl ('|' delimited) files without header
+            "tbl" => {
+                let path = format!("{}/{}.tbl", path, table);
+                let schema = get_schema(table);
+                let options = CsvReadOptions::new()
+                    .schema(&schema)
+                    .delimiter(b'|')
+                    .has_header(false)
+                    .file_extension(".tbl");
+                ctx.register_csv(table, &path, options)
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+            }
+            "csv" => {
+                let path = format!("{}/{}", path, table);
+                let schema = get_schema(table);
+                let options = 
CsvReadOptions::new().schema(&schema).has_header(true);
+                ctx.register_csv(table, &path, options)
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+            }
+            "parquet" => {
+                let path = format!("{}/{}", path, table);
+                ctx.register_parquet(table, &path)
+                    .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+            }
+            other => {
+                unimplemented!("Invalid file format '{}'", other);
+            }
+        }
     }
+
+    let mut millis = vec![];
+
+    // run benchmark
+    let sql = get_query_sql(opt.query)?;
+    println!("Running benchmark with query {}:\n {}", opt.query, sql);
+    for i in 0..opt.iterations {
+        let start = Instant::now();
+        let df = ctx
+            .sql(&sql)
+            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+        let mut batches = vec![];
+        let mut stream = df
+            .collect()
+            .await
+            .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+        while let Some(result) = stream.next().await {
+            let batch = result?;
+            batches.push(batch);
+        }
+        let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+        millis.push(elapsed as f64);
+        println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
+        if opt.debug {
+            pretty::print_batches(&batches)?;
+        }
+    }
+
+    let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+    println!("Query {} avg time: {:.2} ms", opt.query, avg);
+
+    Ok(())
+}
+
+fn get_query_sql(query: usize) -> Result<String> {
+    if query > 0 && query < 23 {
+        let filename = format!("queries/q{}.sql", query);
+        Ok(fs::read_to_string(&filename).expect("failed to read query"))
+    } else {
+        Err(DataFusionError::Plan(
+            "invalid query. Expected value between 1 and 22".to_owned(),
+        ))
+    }
+}
+
+fn create_logical_plan(ctx: &mut ExecutionContext, query: usize) -> 
Result<LogicalPlan> {
+    let sql = get_query_sql(query)?;
+    ctx.create_logical_plan(&sql)
 }
 
 async fn execute_query(
@@ -1668,8 +954,10 @@ mod tests {
                 file_format: "tbl".to_string(),
                 mem_table: false,
                 partitions: 16,
+                host: None,
+                port: None,
             };
-            let actual = benchmark(opt).await?;
+            let actual = benchmark_datafusion(opt).await?;
 
             // assert schema equality without comparing nullable values
             assert_eq!(
diff --git a/ballista/rust/benchmarks/tpch/tpch-gen.sh b/benchmarks/tpch-gen.sh
similarity index 97%
rename from ballista/rust/benchmarks/tpch/tpch-gen.sh
rename to benchmarks/tpch-gen.sh
index f5147f5..fef3480 100755
--- a/ballista/rust/benchmarks/tpch/tpch-gen.sh
+++ b/benchmarks/tpch-gen.sh
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-BALLISTA_VERSION=0.4.2-SNAPSHOT
+BALLISTA_VERSION=0.5.0-SNAPSHOT
 
 #set -e
 
diff --git a/ballista/rust/benchmarks/tpch/tpchgen.dockerfile 
b/benchmarks/tpchgen.dockerfile
similarity index 100%
rename from ballista/rust/benchmarks/tpch/tpchgen.dockerfile
rename to benchmarks/tpchgen.dockerfile
diff --git a/dev/build-rust-base.sh b/dev/build-rust-base.sh
index e424909..1bedbd8 100755
--- a/dev/build-rust-base.sh
+++ b/dev/build-rust-base.sh
@@ -16,6 +16,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-BALLISTA_VERSION=0.4.2-SNAPSHOT
+BALLISTA_VERSION=0.5.0-SNAPSHOT
 set -e
 docker build -t ballistacompute/rust-base:$BALLISTA_VERSION -f 
dev/docker/rust-base.dockerfile .
diff --git a/dev/build-rust.sh b/dev/build-rust.sh
index d31c524..5777d1e 100755
--- a/dev/build-rust.sh
+++ b/dev/build-rust.sh
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-BALLISTA_VERSION=0.4.2-SNAPSHOT
+BALLISTA_VERSION=0.5.0-SNAPSHOT
 
 set -e
 
diff --git a/dev/docker/rust.dockerfile b/dev/docker/rust.dockerfile
index 6505f3c..ba713b1 100644
--- a/dev/docker/rust.dockerfile
+++ b/dev/docker/rust.dockerfile
@@ -22,7 +22,7 @@
 # as a mounted directory.
 
 ARG RELEASE_FLAG=--release
-FROM ballistacompute/rust-base:0.4.2-SNAPSHOT AS base
+FROM ballistacompute/rust-base:0.5.0-SNAPSHOT AS base
 WORKDIR /tmp/ballista
 RUN apt-get -y install cmake
 RUN cargo install cargo-chef 
@@ -73,7 +73,7 @@ ENV RELEASE_FLAG=${RELEASE_FLAG}
 RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch; 
else mv /tmp/ballista/target/release/tpch /tpch; fi
 
 # Copy the binary into a new container for a smaller docker image
-FROM ballistacompute/rust-base:0.4.0-20210213
+FROM ballistacompute/rust-base:0.5.0-SNAPSHOT
 
 COPY --from=builder /executor /
 
@@ -81,6 +81,10 @@ COPY --from=builder /scheduler /
 
 COPY --from=builder /tpch /
 
+ADD benchmarks/run.sh /
+RUN mkdir /queries
+COPY benchmarks/queries/ /queries/
+
 ENV RUST_LOG=info
 ENV RUST_BACKTRACE=full
 
diff --git a/dev/integration-tests.sh b/dev/integration-tests.sh
index 6ed764e..06ab108 100755
--- a/dev/integration-tests.sh
+++ b/dev/integration-tests.sh
@@ -19,11 +19,11 @@
 set -e
 ./dev/build-rust-base.sh
 ./dev/build-rust.sh
-pushd ballista/rust/benchmarks/tpch
+pushd benchmarks
 ./tpch-gen.sh
 
 docker-compose up -d
-docker-compose run ballista-client ./run.sh
+docker-compose run ballista-client /run.sh
 docker-compose down
 
 popd
diff --git a/dev/release/rat_exclude_files.txt 
b/dev/release/rat_exclude_files.txt
index f9eca7a..cef0a91 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -100,6 +100,6 @@ requirements.txt
 *.scss
 .gitattributes
 rust-toolchain
-ballista/rust/benchmarks/tpch/queries/q*.sql
+benchmarks/queries/q*.sql
 ballista/rust/scheduler/testdata/*
 ballista/ui/scheduler/yarn.lock

Reply via email to