This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 32951c3 Consolidate benchmarks (#34)
32951c3 is described below
commit 32951c3c1c62e0ceef5aca200e81d2ea9ea124a7
Author: Andy Grove <[email protected]>
AuthorDate: Fri Apr 23 13:34:40 2021 -0600
Consolidate benchmarks (#34)
---
.dockerignore | 2 +
Cargo.toml | 1 -
ballista/rust/benchmarks/tpch/Cargo.toml | 36 -
ballista/rust/benchmarks/tpch/README.md | 104 ---
ballista/rust/benchmarks/tpch/src/main.rs | 360 --------
.../benchmarks/tpch => benchmarks}/.dockerignore | 0
.../rust/benchmarks/tpch => benchmarks}/.gitignore | 0
benchmarks/Cargo.toml | 1 +
benchmarks/README.md | 106 ++-
.../tpch => benchmarks}/docker-compose.yaml | 8 +-
.../benchmarks/tpch => benchmarks}/entrypoint.sh | 0
.../benchmarks/tpch => benchmarks}/queries/q1.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q10.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q11.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q12.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q13.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q14.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q16.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q17.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q18.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q19.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q2.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q20.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q21.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q22.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q3.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q4.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q5.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q6.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q7.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q8.sql | 0
.../benchmarks/tpch => benchmarks}/queries/q9.sql | 0
.../rust/benchmarks/tpch => benchmarks}/run.sh | 1 +
benchmarks/src/bin/tpch.rs | 950 +++------------------
.../benchmarks/tpch => benchmarks}/tpch-gen.sh | 2 +-
.../tpch => benchmarks}/tpchgen.dockerfile | 0
dev/build-rust-base.sh | 2 +-
dev/build-rust.sh | 2 +-
dev/docker/rust.dockerfile | 8 +-
dev/integration-tests.sh | 4 +-
dev/release/rat_exclude_files.txt | 2 +-
41 files changed, 223 insertions(+), 1366 deletions(-)
diff --git a/.dockerignore b/.dockerignore
index 9a64a12..8cd6a89 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -23,4 +23,6 @@
ci
dev
+testing
+parquet-testing
**/target/*
diff --git a/Cargo.toml b/Cargo.toml
index 0947bea..2f34bab 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,7 +20,6 @@ members = [
"datafusion",
"datafusion-examples",
"benchmarks",
- "ballista/rust/benchmarks/tpch",
"ballista/rust/client",
"ballista/rust/core",
"ballista/rust/executor",
diff --git a/ballista/rust/benchmarks/tpch/Cargo.toml
b/ballista/rust/benchmarks/tpch/Cargo.toml
deleted file mode 100644
index 9311f23..0000000
--- a/ballista/rust/benchmarks/tpch/Cargo.toml
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[package]
-name = "tpch"
-version = "0.5.0-SNAPSHOT"
-homepage = "https://github.com/apache/arrow"
-repository = "https://github.com/apache/arrow"
-authors = ["Apache Arrow <[email protected]>"]
-license = "Apache-2.0"
-edition = "2018"
-
-[dependencies]
-ballista = { path="../../client" }
-datafusion = { path = "../../../../datafusion" }
-
-arrow = { git = "https://github.com/apache/arrow-rs", rev =
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
-parquet = { git = "https://github.com/apache/arrow-rs", rev =
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
-
-env_logger = "0.8"
-tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
-structopt = "0.3"
diff --git a/ballista/rust/benchmarks/tpch/README.md
b/ballista/rust/benchmarks/tpch/README.md
deleted file mode 100644
index 20c4fc7..0000000
--- a/ballista/rust/benchmarks/tpch/README.md
+++ /dev/null
@@ -1,104 +0,0 @@
-<!---
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-# TPC-H Benchmarks
-
-TPC-H is an industry standard benchmark for testing databases and query
engines. A command-line tool is available that
-can generate the raw test data at any given scale factor (scale factor refers
to the amount of data to be generated).
-
-## Generating Test Data
-
-TPC-H data can be generated using the `tpch-gen.sh` script, which creates a
Docker image containing the TPC-DS data
-generator.
-
-```bash
-./tpch-gen.sh
-```
-
-Data will be generated into the `data` subdirectory and will not be checked in
because this directory has been added
-to the `.gitignore` file.
-
-## Running the Benchmarks
-
-To run the benchmarks it is necessary to have at least one Ballista scheduler
and one Ballista executor running.
-
-To run the scheduler from source:
-
-```bash
-cd $ARROW_HOME/ballista/rust/scheduler
-RUST_LOG=info cargo run --release
-```
-
-By default the scheduler will bind to `0.0.0.0` and listen on port 50050.
-
-To run the executor from source:
-
-```bash
-cd $ARROW_HOME/ballista/rust/executor
-RUST_LOG=info cargo run --release
-```
-
-By default the executor will bind to `0.0.0.0` and listen on port 50051.
-
-You can add SIMD/snmalloc/LTO flags to improve speed (with longer build times):
-
-```
-RUST_LOG=info RUSTFLAGS='-C target-cpu=native -C lto -C codegen-units=1 -C
embed-bitcode' cargo run --release --bin executor --features "simd snmalloc"
--target x86_64-unknown-linux-gnu
-```
-
-To run the benchmarks:
-
-```bash
-cd $ARROW_HOME/ballista/rust/benchmarks/tpch
-cargo run --release benchmark --host localhost --port 50050 --query 1 --path
$(pwd)/data --format tbl
-```
-
-## Running the Benchmarks on docker-compose
-
-To start a Rust scheduler and executor using Docker Compose:
-
-```bash
-cd $BALLISTA_HOME
-./dev/build-rust.sh
-cd $BALLISTA_HOME/rust/benchmarks/tpch
-docker-compose up
-```
-
-Then you can run the benchmark with:
-
-```bash
-docker-compose run ballista-client cargo run benchmark --host
ballista-scheduler --port 50050 --query 1 --path /data --format tbl
-```
-
-## Expected output
-
-The result of query 1 should produce the following output when executed
against the SF=1 dataset.
-
-```
-+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
-| l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price
| sum_charge | avg_qty | avg_price | avg_disc
| count_order |
-+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
-| A | F | 37734107 | 56586554400.73001 |
53758257134.870026 | 55909065222.82768 | 25.522005853257337 |
38273.12973462168 | 0.049985295838396455 | 1478493 |
-| N | F | 991417 | 1487504710.3799996 |
1413082168.0541 | 1469649223.1943746 | 25.516471920522985 |
38284.467760848296 | 0.05009342667421622 | 38854 |
-| N | O | 74476023 | 111701708529.50996 |
106118209986.10472 | 110367023144.56622 | 25.502229680934594 | 38249.1238377803
| 0.049996589476752576 | 2920373 |
-| R | F | 37719753 | 56568041380.90001 |
53741292684.60399 | 55889619119.83194 | 25.50579361269077 |
38250.854626099666 | 0.05000940583012587 | 1478870 |
-+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
-Query 1 iteration 0 took 1956.1 ms
-Query 1 avg time: 1956.11 ms
-```
diff --git a/ballista/rust/benchmarks/tpch/src/main.rs
b/ballista/rust/benchmarks/tpch/src/main.rs
deleted file mode 100644
index 1ba46ea..0000000
--- a/ballista/rust/benchmarks/tpch/src/main.rs
+++ /dev/null
@@ -1,360 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
-//!
-//! This is a modified version of the DataFusion version of these benchmarks.
-
-use std::collections::HashMap;
-use std::fs;
-use std::path::{Path, PathBuf};
-use std::time::Instant;
-
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::util::pretty;
-use ballista::prelude::*;
-use datafusion::prelude::*;
-use parquet::basic::Compression;
-use parquet::file::properties::WriterProperties;
-use structopt::StructOpt;
-
-#[derive(Debug, StructOpt)]
-struct BenchmarkOpt {
- /// Ballista executor host
- #[structopt(long = "host")]
- host: String,
-
- /// Ballista executor port
- #[structopt(long = "port")]
- port: u16,
-
- /// Query number
- #[structopt(long)]
- query: usize,
-
- /// Activate debug mode to see query results
- #[structopt(long)]
- debug: bool,
-
- /// Number of iterations of each test run
- #[structopt(long = "iterations", default_value = "1")]
- iterations: usize,
-
- /// Batch size when reading CSV or Parquet files
- #[structopt(long = "batch-size", default_value = "32768")]
- batch_size: usize,
-
- /// Path to data files
- #[structopt(parse(from_os_str), required = true, long = "path")]
- path: PathBuf,
-
- /// File format: `csv`, `tbl` or `parquet`
- #[structopt(long = "format")]
- file_format: String,
-}
-
-#[derive(Debug, StructOpt)]
-struct ConvertOpt {
- /// Path to csv files
- #[structopt(parse(from_os_str), required = true, short = "i", long =
"input")]
- input_path: PathBuf,
-
- /// Output path
- #[structopt(parse(from_os_str), required = true, short = "o", long =
"output")]
- output_path: PathBuf,
-
- /// Output file format: `csv` or `parquet`
- #[structopt(short = "f", long = "format")]
- file_format: String,
-
- /// Compression to use when writing Parquet files
- #[structopt(short = "c", long = "compression", default_value = "snappy")]
- compression: String,
-
- /// Number of partitions to produce
- #[structopt(short = "p", long = "partitions", default_value = "1")]
- partitions: usize,
-
- /// Batch size when reading CSV or Parquet files
- #[structopt(short = "s", long = "batch-size", default_value = "4096")]
- batch_size: usize,
-}
-
-#[derive(Debug, StructOpt)]
-#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
-enum TpchOpt {
- Benchmark(BenchmarkOpt),
- Convert(ConvertOpt),
-}
-
-const TABLES: &[&str] = &[
- "part", "supplier", "partsupp", "customer", "orders", "lineitem",
"nation", "region",
-];
-
-#[tokio::main]
-async fn main() -> Result<()> {
- env_logger::init();
- match TpchOpt::from_args() {
- TpchOpt::Benchmark(opt) => benchmark(opt).await.map(|_| ()),
- TpchOpt::Convert(opt) => convert_tbl(opt).await,
- }
-}
-
-async fn benchmark(opt: BenchmarkOpt) -> Result<()> {
- println!("Running benchmarks with the following options: {:?}", opt);
-
- let mut settings = HashMap::new();
- settings.insert("batch.size".to_owned(), format!("{}", opt.batch_size));
-
- let ctx = BallistaContext::remote(opt.host.as_str(), opt.port, settings);
-
- // register tables with Ballista context
- let path = opt.path.to_str().unwrap();
- let file_format = opt.file_format.as_str();
- for table in TABLES {
- match file_format {
- // dbgen creates .tbl ('|' delimited) files without header
- "tbl" => {
- let path = format!("{}/{}.tbl", path, table);
- let schema = get_schema(table);
- let options = CsvReadOptions::new()
- .schema(&schema)
- .delimiter(b'|')
- .has_header(false)
- .file_extension(".tbl");
- ctx.register_csv(table, &path, options)?;
- }
- "csv" => {
- let path = format!("{}/{}", path, table);
- let schema = get_schema(table);
- let options =
CsvReadOptions::new().schema(&schema).has_header(true);
- ctx.register_csv(table, &path, options)?;
- }
- "parquet" => {
- let path = format!("{}/{}", path, table);
- ctx.register_parquet(table, &path)?;
- }
- other => {
- unimplemented!("Invalid file format '{}'", other);
- }
- }
- }
-
- let mut millis = vec![];
-
- // run benchmark
- let sql = get_query_sql(opt.query)?;
- println!("Running benchmark with query {}:\n {}", opt.query, sql);
- for i in 0..opt.iterations {
- let start = Instant::now();
- let df = ctx.sql(&sql)?;
- let mut batches = vec![];
- let mut stream = df.collect().await?;
- while let Some(result) = stream.next().await {
- let batch = result?;
- batches.push(batch);
- }
- let elapsed = start.elapsed().as_secs_f64() * 1000.0;
- millis.push(elapsed as f64);
- println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
- if opt.debug {
- pretty::print_batches(&batches)?;
- }
- }
-
- let avg = millis.iter().sum::<f64>() / millis.len() as f64;
- println!("Query {} avg time: {:.2} ms", opt.query, avg);
-
- Ok(())
-}
-
-fn get_query_sql(query: usize) -> Result<String> {
- if query > 0 && query < 23 {
- let filename = format!("queries/q{}.sql", query);
- Ok(fs::read_to_string(&filename).expect("failed to read query"))
- } else {
- Err(BallistaError::General(
- "invalid query. Expected value between 1 and 22".to_owned(),
- ))
- }
-}
-
-async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
- let output_root_path = Path::new(&opt.output_path);
- for table in TABLES {
- let start = Instant::now();
- let schema = get_schema(table);
-
- let input_path = format!("{}/{}.tbl",
opt.input_path.to_str().unwrap(), table);
- let options = CsvReadOptions::new()
- .schema(&schema)
- .delimiter(b'|')
- .file_extension(".tbl");
-
- let config = ExecutionConfig::new().with_batch_size(opt.batch_size);
- let mut ctx = ExecutionContext::with_config(config);
-
- // build plan to read the TBL file
- let mut csv = ctx.read_csv(&input_path, options)?;
-
- // optionally, repartition the file
- if opt.partitions > 1 {
- csv =
csv.repartition(Partitioning::RoundRobinBatch(opt.partitions))?
- }
-
- // create the physical plan
- let csv = csv.to_logical_plan();
- let csv = ctx.optimize(&csv)?;
- let csv = ctx.create_physical_plan(&csv)?;
-
- let output_path = output_root_path.join(table);
- let output_path = output_path.to_str().unwrap().to_owned();
-
- println!(
- "Converting '{}' to {} files in directory '{}'",
- &input_path, &opt.file_format, &output_path
- );
- match opt.file_format.as_str() {
- "csv" => ctx.write_csv(csv, output_path).await?,
- "parquet" => {
- let compression = match opt.compression.as_str() {
- "none" => Compression::UNCOMPRESSED,
- "snappy" => Compression::SNAPPY,
- "brotli" => Compression::BROTLI,
- "gzip" => Compression::GZIP,
- "lz4" => Compression::LZ4,
- "lz0" => Compression::LZO,
- "zstd" => Compression::ZSTD,
- other => {
- return Err(BallistaError::NotImplemented(format!(
- "Invalid compression format: {}",
- other
- )))
- }
- };
- let props = WriterProperties::builder()
- .set_compression(compression)
- .build();
- ctx.write_parquet(csv, output_path, Some(props)).await?
- }
- other => {
- return Err(BallistaError::NotImplemented(format!(
- "Invalid output format: {}",
- other
- )))
- }
- }
- println!("Conversion completed in {} ms", start.elapsed().as_millis());
- }
-
- Ok(())
-}
-
-fn get_schema(table: &str) -> Schema {
- // note that the schema intentionally uses signed integers so that any
generated Parquet
- // files can also be used to benchmark tools that only support signed
integers, such as
- // Apache Spark
-
- match table {
- "part" => Schema::new(vec![
- Field::new("p_partkey", DataType::Int32, false),
- Field::new("p_name", DataType::Utf8, false),
- Field::new("p_mfgr", DataType::Utf8, false),
- Field::new("p_brand", DataType::Utf8, false),
- Field::new("p_type", DataType::Utf8, false),
- Field::new("p_size", DataType::Int32, false),
- Field::new("p_container", DataType::Utf8, false),
- Field::new("p_retailprice", DataType::Float64, false),
- Field::new("p_comment", DataType::Utf8, false),
- ]),
-
- "supplier" => Schema::new(vec![
- Field::new("s_suppkey", DataType::Int32, false),
- Field::new("s_name", DataType::Utf8, false),
- Field::new("s_address", DataType::Utf8, false),
- Field::new("s_nationkey", DataType::Int32, false),
- Field::new("s_phone", DataType::Utf8, false),
- Field::new("s_acctbal", DataType::Float64, false),
- Field::new("s_comment", DataType::Utf8, false),
- ]),
-
- "partsupp" => Schema::new(vec![
- Field::new("ps_partkey", DataType::Int32, false),
- Field::new("ps_suppkey", DataType::Int32, false),
- Field::new("ps_availqty", DataType::Int32, false),
- Field::new("ps_supplycost", DataType::Float64, false),
- Field::new("ps_comment", DataType::Utf8, false),
- ]),
-
- "customer" => Schema::new(vec![
- Field::new("c_custkey", DataType::Int32, false),
- Field::new("c_name", DataType::Utf8, false),
- Field::new("c_address", DataType::Utf8, false),
- Field::new("c_nationkey", DataType::Int32, false),
- Field::new("c_phone", DataType::Utf8, false),
- Field::new("c_acctbal", DataType::Float64, false),
- Field::new("c_mktsegment", DataType::Utf8, false),
- Field::new("c_comment", DataType::Utf8, false),
- ]),
-
- "orders" => Schema::new(vec![
- Field::new("o_orderkey", DataType::Int32, false),
- Field::new("o_custkey", DataType::Int32, false),
- Field::new("o_orderstatus", DataType::Utf8, false),
- Field::new("o_totalprice", DataType::Float64, false),
- Field::new("o_orderdate", DataType::Date32, false),
- Field::new("o_orderpriority", DataType::Utf8, false),
- Field::new("o_clerk", DataType::Utf8, false),
- Field::new("o_shippriority", DataType::Int32, false),
- Field::new("o_comment", DataType::Utf8, false),
- ]),
-
- "lineitem" => Schema::new(vec![
- Field::new("l_orderkey", DataType::Int32, false),
- Field::new("l_partkey", DataType::Int32, false),
- Field::new("l_suppkey", DataType::Int32, false),
- Field::new("l_linenumber", DataType::Int32, false),
- Field::new("l_quantity", DataType::Float64, false),
- Field::new("l_extendedprice", DataType::Float64, false),
- Field::new("l_discount", DataType::Float64, false),
- Field::new("l_tax", DataType::Float64, false),
- Field::new("l_returnflag", DataType::Utf8, false),
- Field::new("l_linestatus", DataType::Utf8, false),
- Field::new("l_shipdate", DataType::Date32, false),
- Field::new("l_commitdate", DataType::Date32, false),
- Field::new("l_receiptdate", DataType::Date32, false),
- Field::new("l_shipinstruct", DataType::Utf8, false),
- Field::new("l_shipmode", DataType::Utf8, false),
- Field::new("l_comment", DataType::Utf8, false),
- ]),
-
- "nation" => Schema::new(vec![
- Field::new("n_nationkey", DataType::Int32, false),
- Field::new("n_name", DataType::Utf8, false),
- Field::new("n_regionkey", DataType::Int32, false),
- Field::new("n_comment", DataType::Utf8, false),
- ]),
-
- "region" => Schema::new(vec![
- Field::new("r_regionkey", DataType::Int32, false),
- Field::new("r_name", DataType::Utf8, false),
- Field::new("r_comment", DataType::Utf8, false),
- ]),
-
- _ => unimplemented!(),
- }
-}
diff --git a/ballista/rust/benchmarks/tpch/.dockerignore
b/benchmarks/.dockerignore
similarity index 100%
rename from ballista/rust/benchmarks/tpch/.dockerignore
rename to benchmarks/.dockerignore
diff --git a/ballista/rust/benchmarks/tpch/.gitignore b/benchmarks/.gitignore
similarity index 100%
rename from ballista/rust/benchmarks/tpch/.gitignore
rename to benchmarks/.gitignore
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 6eb6ab9..3562266 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,6 +34,7 @@ snmalloc = ["snmalloc-rs"]
arrow = { git = "https://github.com/apache/arrow-rs", rev =
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
parquet = { git = "https://github.com/apache/arrow-rs", rev =
"c3fe3bab9905739fdda75301dab07a18c91731bd" }
datafusion = { path = "../datafusion" }
+ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
futures = "0.3"
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 7460477..e003d96 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -17,53 +17,47 @@
under the License.
-->
-# Apache Arrow Rust Benchmarks
+# DataFusion and Ballista Benchmarks
This crate contains benchmarks based on popular public data sets and open
source benchmark suites, making it easy to
run real-world benchmarks to help with performance and scalability testing and
for comparing performance with other Arrow
implementations as well as other query engines.
-Currently, only DataFusion benchmarks exist, but the plan is to add benchmarks
for the arrow, flight, and parquet
-crates as well.
-
## Benchmark derived from TPC-H
These benchmarks are derived from the [TPC-H][1] benchmark.
-Data for this benchmark can be generated using the [tpch-dbgen][2]
command-line tool. Run the following commands to
-clone the repository and build the source code.
+## Generating Test Data
+
+TPC-H data can be generated using the `tpch-gen.sh` script, which creates a
Docker image containing the TPC-DS data
+generator.
```bash
-git clone [email protected]:databricks/tpch-dbgen.git
-cd tpch-dbgen
-make
-export TPCH_DATA=$(pwd)
+./tpch-gen.sh
```
-Data can now be generated with the following command. Note that `-s 1` means
use Scale Factor 1 or ~1 GB of
-data. This value can be increased to generate larger data sets.
+Data will be generated into the `data` subdirectory and will not be checked in
because this directory has been added
+to the `.gitignore` file.
-```bash
-./dbgen -vf -s 1
-```
+## Running the DataFusion Benchmarks
-The benchmark can then be run (assuming the data created from `dbgen` is in
`/mnt/tpch-dbgen`) with a command such as:
+The benchmark can then be run (assuming the data created from `dbgen` is in
`./data`) with a command such as:
```bash
-cargo run --release --bin tpch -- benchmark --iterations 3 --path
/mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
+cargo run --release --bin tpch -- benchmark --iterations 3 --path ./data
--format tbl --query 1 --batch-size 4096
```
You can enable the features `simd` (to use SIMD instructions) and/or
`mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as
features by passing them in as `--features`:
```
-cargo run --release --features "simd mimalloc" --bin tpch -- benchmark
--iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
+cargo run --release --features "simd mimalloc" --bin tpch -- benchmark
--iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```
The benchmark program also supports CSV and Parquet input file formats and a
utility is provided to convert from `tbl`
(generated by the `dbgen` utility) to CSV and Parquet.
```bash
-cargo run --release --bin tpch -- convert --input /mnt/tpch-dbgen --output
/mnt/tpch-parquet --format parquet
+cargo run --release --bin tpch -- convert --input ./data --output
/mnt/tpch-parquet --format parquet
```
This utility does not yet provide support for changing the number of
partitions when performing the conversion. Another
@@ -97,9 +91,78 @@ docker run -v /mnt:/mnt -it
ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT \
--partitions 64
```
+## Running the Ballista Benchmarks
+
+To run the benchmarks it is necessary to have at least one Ballista scheduler
and one Ballista executor running.
+
+To run the scheduler from source:
+
+```bash
+cd $ARROW_HOME/ballista/rust/scheduler
+RUST_LOG=info cargo run --release
+```
+
+By default the scheduler will bind to `0.0.0.0` and listen on port 50050.
+
+To run the executor from source:
+
+```bash
+cd $ARROW_HOME/ballista/rust/executor
+RUST_LOG=info cargo run --release
+```
+
+By default the executor will bind to `0.0.0.0` and listen on port 50051.
+
+You can add SIMD/snmalloc/LTO flags to improve speed (with longer build times):
+
+```
+RUST_LOG=info RUSTFLAGS='-C target-cpu=native -C lto -C codegen-units=1 -C
embed-bitcode' cargo run --release --bin executor --features "simd snmalloc"
--target x86_64-unknown-linux-gnu
+```
+
+To run the benchmarks:
+
+```bash
+cd $ARROW_HOME/ballista/rust/benchmarks/tpch
+cargo run --release benchmark --host localhost --port 50050 --query 1 --path
$(pwd)/data --format tbl
+```
+
+## Running the Ballista Benchmarks on docker-compose
+
+To start a Rust scheduler and executor using Docker Compose:
+
+```bash
+cd $BALLISTA_HOME
+./dev/build-rust.sh
+cd $BALLISTA_HOME/rust/benchmarks/tpch
+docker-compose up
+```
+
+Then you can run the benchmark with:
+
+```bash
+docker-compose run ballista-client cargo run benchmark --host
ballista-scheduler --port 50050 --query 1 --path /data --format tbl
+```
+
+## Expected output
+
+The result of query 1 should produce the following output when executed
against the SF=1 dataset.
+
+```
++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
+| l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price
| sum_charge | avg_qty | avg_price | avg_disc
| count_order |
++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
+| A | F | 37734107 | 56586554400.73001 |
53758257134.870026 | 55909065222.82768 | 25.522005853257337 |
38273.12973462168 | 0.049985295838396455 | 1478493 |
+| N | F | 991417 | 1487504710.3799996 |
1413082168.0541 | 1469649223.1943746 | 25.516471920522985 |
38284.467760848296 | 0.05009342667421622 | 38854 |
+| N | O | 74476023 | 111701708529.50996 |
106118209986.10472 | 110367023144.56622 | 25.502229680934594 | 38249.1238377803
| 0.049996589476752576 | 2920373 |
+| R | F | 37719753 | 56568041380.90001 |
53741292684.60399 | 55889619119.83194 | 25.50579361269077 |
38250.854626099666 | 0.05000940583012587 | 1478870 |
++--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
+Query 1 iteration 0 took 1956.1 ms
+Query 1 avg time: 1956.11 ms
+```
+
## NYC Taxi Benchmark
-These benchmarks are based on the [New York Taxi and Limousine Commission][3]
data set.
+These benchmarks are based on the [New York Taxi and Limousine Commission][2]
data set.
```bash
cargo run --release --bin nyctaxi -- --iterations 3 --path /mnt/nyctaxi/csv
--format csv --batch-size 4096
@@ -116,5 +179,4 @@ Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
```
[1]: http://www.tpc.org/tpch/
-[2]: https://github.com/databricks/tpch-dbgen
-[3]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
+[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
diff --git a/ballista/rust/benchmarks/tpch/docker-compose.yaml
b/benchmarks/docker-compose.yaml
similarity index 90%
rename from ballista/rust/benchmarks/tpch/docker-compose.yaml
rename to benchmarks/docker-compose.yaml
index f872ce1..6015dba 100644
--- a/ballista/rust/benchmarks/tpch/docker-compose.yaml
+++ b/benchmarks/docker-compose.yaml
@@ -20,7 +20,7 @@ services:
image: quay.io/coreos/etcd:v3.4.9
command: "etcd -advertise-client-urls http://etcd:2379 -listen-client-urls
http://0.0.0.0:2379"
ballista-scheduler:
- image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+ image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
command: "/scheduler --config-backend etcd --etcd-urls etcd:2379
--bind-host 0.0.0.0 --port 50050"
environment:
- RUST_LOG=ballista=debug
@@ -29,7 +29,7 @@ services:
depends_on:
- etcd
ballista-executor-1:
- image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+ image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
command: "/executor --bind-host 0.0.0.0 --port 50051 --external-host
ballista-executor-1 --scheduler-host ballista-scheduler"
environment:
- RUST_LOG=info
@@ -38,7 +38,7 @@ services:
depends_on:
- ballista-scheduler
ballista-executor-2:
- image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+ image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
command: "/executor --bind-host 0.0.0.0 --port 50052 --external-host
ballista-executor-2 --scheduler-host ballista-scheduler"
environment:
- RUST_LOG=info
@@ -47,7 +47,7 @@ services:
depends_on:
- ballista-scheduler
ballista-client:
- image: ballistacompute/ballista-rust:0.4.2-SNAPSHOT
+ image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT
command: "/bin/sh" # do nothing
working_dir: /ballista/benchmarks/tpch
environment:
diff --git a/ballista/rust/benchmarks/tpch/entrypoint.sh
b/benchmarks/entrypoint.sh
similarity index 100%
rename from ballista/rust/benchmarks/tpch/entrypoint.sh
rename to benchmarks/entrypoint.sh
diff --git a/ballista/rust/benchmarks/tpch/queries/q1.sql
b/benchmarks/queries/q1.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q1.sql
rename to benchmarks/queries/q1.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q10.sql
b/benchmarks/queries/q10.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q10.sql
rename to benchmarks/queries/q10.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q11.sql
b/benchmarks/queries/q11.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q11.sql
rename to benchmarks/queries/q11.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q12.sql
b/benchmarks/queries/q12.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q12.sql
rename to benchmarks/queries/q12.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q13.sql
b/benchmarks/queries/q13.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q13.sql
rename to benchmarks/queries/q13.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q14.sql
b/benchmarks/queries/q14.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q14.sql
rename to benchmarks/queries/q14.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q16.sql
b/benchmarks/queries/q16.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q16.sql
rename to benchmarks/queries/q16.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q17.sql
b/benchmarks/queries/q17.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q17.sql
rename to benchmarks/queries/q17.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q18.sql
b/benchmarks/queries/q18.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q18.sql
rename to benchmarks/queries/q18.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q19.sql
b/benchmarks/queries/q19.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q19.sql
rename to benchmarks/queries/q19.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q2.sql
b/benchmarks/queries/q2.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q2.sql
rename to benchmarks/queries/q2.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q20.sql
b/benchmarks/queries/q20.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q20.sql
rename to benchmarks/queries/q20.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q21.sql
b/benchmarks/queries/q21.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q21.sql
rename to benchmarks/queries/q21.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q22.sql
b/benchmarks/queries/q22.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q22.sql
rename to benchmarks/queries/q22.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q3.sql
b/benchmarks/queries/q3.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q3.sql
rename to benchmarks/queries/q3.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q4.sql
b/benchmarks/queries/q4.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q4.sql
rename to benchmarks/queries/q4.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q5.sql
b/benchmarks/queries/q5.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q5.sql
rename to benchmarks/queries/q5.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q6.sql
b/benchmarks/queries/q6.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q6.sql
rename to benchmarks/queries/q6.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q7.sql
b/benchmarks/queries/q7.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q7.sql
rename to benchmarks/queries/q7.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q8.sql
b/benchmarks/queries/q8.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q8.sql
rename to benchmarks/queries/q8.sql
diff --git a/ballista/rust/benchmarks/tpch/queries/q9.sql
b/benchmarks/queries/q9.sql
similarity index 100%
rename from ballista/rust/benchmarks/tpch/queries/q9.sql
rename to benchmarks/queries/q9.sql
diff --git a/ballista/rust/benchmarks/tpch/run.sh b/benchmarks/run.sh
similarity index 99%
rename from ballista/rust/benchmarks/tpch/run.sh
rename to benchmarks/run.sh
index c8a36b6..fd97ff9 100755
--- a/ballista/rust/benchmarks/tpch/run.sh
+++ b/benchmarks/run.sh
@@ -19,6 +19,7 @@ set -e
# This bash script is meant to be run inside the docker-compose environment.
Check the README for instructions
+cd /
for query in 1 3 5 6 10 12
do
/tpch benchmark --host ballista-scheduler --port 50050 --query $query --path
/data --format tbl --iterations 1 --debug
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index b203ceb..fd9f052 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -17,21 +17,26 @@
//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
-use std::time::Instant;
use std::{
+ collections::HashMap,
+ fs,
+ iter::Iterator,
path::{Path, PathBuf},
sync::Arc,
+ time::Instant,
};
+use futures::StreamExt;
+
use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty;
+use ballista::context::BallistaContext;
use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{CsvFile, MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::collect;
use datafusion::prelude::*;
-
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use structopt::StructOpt;
@@ -44,7 +49,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
-#[derive(Debug, StructOpt)]
+#[derive(Debug, StructOpt, Clone)]
struct BenchmarkOpt {
/// Query number
#[structopt(short, long)]
@@ -81,6 +86,14 @@ struct BenchmarkOpt {
/// Number of partitions to create when using MemTable as input
#[structopt(short = "n", long = "partitions", default_value = "8")]
partitions: usize,
+
+ /// Ballista executor host
+ #[structopt(long = "host")]
+ host: Option<String>,
+
+ /// Ballista executor port
+ #[structopt(long = "port")]
+ port: Option<u16>,
}
#[derive(Debug, StructOpt)]
@@ -125,12 +138,20 @@ const TABLES: &[&str] = &[
async fn main() -> Result<()> {
env_logger::init();
match TpchOpt::from_args() {
- TpchOpt::Benchmark(opt) => benchmark(opt).await.map(|_| ()),
+ TpchOpt::Benchmark(opt) => {
+ if opt.host.is_some() && opt.port.is_some() {
+ benchmark_ballista(opt).await.map(|_| ())
+ } else {
+ benchmark_datafusion(opt).await.map(|_| ())
+ }
+ }
TpchOpt::Convert(opt) => convert_tbl(opt).await,
}
}
-async fn benchmark(opt: BenchmarkOpt) ->
Result<Vec<arrow::record_batch::RecordBatch>> {
+async fn benchmark_datafusion(
+ opt: BenchmarkOpt,
+) -> Result<Vec<arrow::record_batch::RecordBatch>> {
println!("Running benchmarks with the following options: {:?}", opt);
let config = ExecutionConfig::new()
.with_concurrency(opt.concurrency)
@@ -181,832 +202,97 @@ async fn benchmark(opt: BenchmarkOpt) ->
Result<Vec<arrow::record_batch::RecordB
Ok(result)
}
-fn create_logical_plan(ctx: &mut ExecutionContext, query: usize) ->
Result<LogicalPlan> {
- match query {
- // original
- // 1 => ctx.create_logical_plan(
- // "select
- // l_returnflag,
- // l_linestatus,
- // sum(l_quantity) as sum_qty,
- // sum(l_extendedprice) as sum_base_price,
- // sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
- // sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as
sum_charge,
- // avg(l_quantity) as avg_qty,
- // avg(l_extendedprice) as avg_price,
- // avg(l_discount) as avg_disc,
- // count(*) as count_order
- // from
- // lineitem
- // where
- // l_shipdate <= date '1998-12-01' - interval '90' day (3)
- // group by
- // l_returnflag,
- // l_linestatus
- // order by
- // l_returnflag,
- // l_linestatus;"
- // ),
- 1 => ctx.create_logical_plan(
- "select
- l_returnflag,
- l_linestatus,
- sum(l_quantity) as sum_qty,
- sum(l_extendedprice) as sum_base_price,
- sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
- sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as
sum_charge,
- avg(l_quantity) as avg_qty,
- avg(l_extendedprice) as avg_price,
- avg(l_discount) as avg_disc,
- count(*) as count_order
- from
- lineitem
- where
- l_shipdate <= date '1998-09-02'
- group by
- l_returnflag,
- l_linestatus
- order by
- l_returnflag,
- l_linestatus;",
- ),
-
- 2 => ctx.create_logical_plan(
- "select
- s_acctbal,
- s_name,
- n_name,
- p_partkey,
- p_mfgr,
- s_address,
- s_phone,
- s_comment
- from
- part,
- supplier,
- partsupp,
- nation,
- region
- where
- p_partkey = ps_partkey
- and s_suppkey = ps_suppkey
- and p_size = 15
- and p_type like '%BRASS'
- and s_nationkey = n_nationkey
- and n_regionkey = r_regionkey
- and r_name = 'EUROPE'
- and ps_supplycost = (
- select
- min(ps_supplycost)
- from
- partsupp,
- supplier,
- nation,
- region
- where
- p_partkey = ps_partkey
- and s_suppkey = ps_suppkey
- and s_nationkey = n_nationkey
- and n_regionkey = r_regionkey
- and r_name = 'EUROPE'
- )
- order by
- s_acctbal desc,
- n_name,
- s_name,
- p_partkey;",
- ),
-
- 3 => ctx.create_logical_plan(
- "select
- l_orderkey,
- sum(l_extendedprice * (1 - l_discount)) as revenue,
- o_orderdate,
- o_shippriority
- from
- customer,
- orders,
- lineitem
- where
- c_mktsegment = 'BUILDING'
- and c_custkey = o_custkey
- and l_orderkey = o_orderkey
- and o_orderdate < date '1995-03-15'
- and l_shipdate > date '1995-03-15'
- group by
- l_orderkey,
- o_orderdate,
- o_shippriority
- order by
- revenue desc,
- o_orderdate;",
- ),
-
- 4 => ctx.create_logical_plan(
- "select
- o_orderpriority,
- count(*) as order_count
- from
- orders
- where
- o_orderdate >= '1993-07-01'
- and o_orderdate < date '1993-07-01' + interval '3' month
- and exists (
- select
- *
- from
- lineitem
- where
- l_orderkey = o_orderkey
- and l_commitdate < l_receiptdate
- )
- group by
- o_orderpriority
- order by
- o_orderpriority;",
- ),
-
- // original
- // 5 => ctx.create_logical_plan(
- // "select
- // n_name,
- // sum(l_extendedprice * (1 - l_discount)) as revenue
- // from
- // customer,
- // orders,
- // lineitem,
- // supplier,
- // nation,
- // region
- // where
- // c_custkey = o_custkey
- // and l_orderkey = o_orderkey
- // and l_suppkey = s_suppkey
- // and c_nationkey = s_nationkey
- // and s_nationkey = n_nationkey
- // and n_regionkey = r_regionkey
- // and r_name = 'ASIA'
- // and o_orderdate >= date '1994-01-01'
- // and o_orderdate < date '1994-01-01' + interval '1' year
- // group by
- // n_name
- // order by
- // revenue desc;"
- // ),
- 5 => ctx.create_logical_plan(
- "select
- n_name,
- sum(l_extendedprice * (1 - l_discount)) as revenue
- from
- customer,
- orders,
- lineitem,
- supplier,
- nation,
- region
- where
- c_custkey = o_custkey
- and l_orderkey = o_orderkey
- and l_suppkey = s_suppkey
- and c_nationkey = s_nationkey
- and s_nationkey = n_nationkey
- and n_regionkey = r_regionkey
- and r_name = 'ASIA'
- and o_orderdate >= date '1994-01-01'
- and o_orderdate < date '1995-01-01'
- group by
- n_name
- order by
- revenue desc;",
- ),
-
- // original
- // 6 => ctx.create_logical_plan(
- // "select
- // sum(l_extendedprice * l_discount) as revenue
- // from
- // lineitem
- // where
- // l_shipdate >= date '1994-01-01'
- // and l_shipdate < date '1994-01-01' + interval '1' year
- // and l_discount between .06 - 0.01 and .06 + 0.01
- // and l_quantity < 24;"
- // ),
- 6 => ctx.create_logical_plan(
- "select
- sum(l_extendedprice * l_discount) as revenue
- from
- lineitem
- where
- l_shipdate >= date '1994-01-01'
- and l_shipdate < date '1995-01-01'
- and l_discount between .06 - 0.01 and .06 + 0.01
- and l_quantity < 24;",
- ),
-
- 7 => ctx.create_logical_plan(
- "select
- supp_nation,
- cust_nation,
- l_year,
- sum(volume) as revenue
- from
- (
- select
- n1.n_name as supp_nation,
- n2.n_name as cust_nation,
- extract(year from l_shipdate) as l_year,
- l_extendedprice * (1 - l_discount) as volume
- from
- supplier,
- lineitem,
- orders,
- customer,
- nation n1,
- nation n2
- where
- s_suppkey = l_suppkey
- and o_orderkey = l_orderkey
- and c_custkey = o_custkey
- and s_nationkey = n1.n_nationkey
- and c_nationkey = n2.n_nationkey
- and (
- (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY')
- or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE')
- )
- and l_shipdate between date '1995-01-01' and date
'1996-12-31'
- ) as shipping
- group by
- supp_nation,
- cust_nation,
- l_year
- order by
- supp_nation,
- cust_nation,
- l_year;",
- ),
-
- 8 => ctx.create_logical_plan(
- "select
- o_year,
- sum(case
- when nation = 'BRAZIL' then volume
- else 0
- end) / sum(volume) as mkt_share
- from
- (
- select
- extract(year from o_orderdate) as o_year,
- l_extendedprice * (1 - l_discount) as volume,
- n2.n_name as nation
- from
- part,
- supplier,
- lineitem,
- orders,
- customer,
- nation n1,
- nation n2,
- region
- where
- p_partkey = l_partkey
- and s_suppkey = l_suppkey
- and l_orderkey = o_orderkey
- and o_custkey = c_custkey
- and c_nationkey = n1.n_nationkey
- and n1.n_regionkey = r_regionkey
- and r_name = 'AMERICA'
- and s_nationkey = n2.n_nationkey
- and o_orderdate between date '1995-01-01' and date
'1996-12-31'
- and p_type = 'ECONOMY ANODIZED STEEL'
- ) as all_nations
- group by
- o_year
- order by
- o_year;",
- ),
-
- 9 => ctx.create_logical_plan(
- "select
- nation,
- o_year,
- sum(amount) as sum_profit
- from
- (
- select
- n_name as nation,
- extract(year from o_orderdate) as o_year,
- l_extendedprice * (1 - l_discount) - ps_supplycost *
l_quantity as amount
- from
- part,
- supplier,
- lineitem,
- partsupp,
- orders,
- nation
- where
- s_suppkey = l_suppkey
- and ps_suppkey = l_suppkey
- and ps_partkey = l_partkey
- and p_partkey = l_partkey
- and o_orderkey = l_orderkey
- and s_nationkey = n_nationkey
- and p_name like '%green%'
- ) as profit
- group by
- nation,
- o_year
- order by
- nation,
- o_year desc;",
- ),
-
- // 10 => ctx.create_logical_plan(
- // "select
- // c_custkey,
- // c_name,
- // sum(l_extendedprice * (1 - l_discount)) as revenue,
- // c_acctbal,
- // n_name,
- // c_address,
- // c_phone,
- // c_comment
- // from
- // customer,
- // orders,
- // lineitem,
- // nation
- // where
- // c_custkey = o_custkey
- // and l_orderkey = o_orderkey
- // and o_orderdate >= date '1993-10-01'
- // and o_orderdate < date '1993-10-01' + interval '3' month
- // and l_returnflag = 'R'
- // and c_nationkey = n_nationkey
- // group by
- // c_custkey,
- // c_name,
- // c_acctbal,
- // c_phone,
- // n_name,
- // c_address,
- // c_comment
- // order by
- // revenue desc;"
- // ),
- 10 => ctx.create_logical_plan(
- "select
- c_custkey,
- c_name,
- sum(l_extendedprice * (1 - l_discount)) as revenue,
- c_acctbal,
- n_name,
- c_address,
- c_phone,
- c_comment
- from
- customer,
- orders,
- lineitem,
- nation
- where
- c_custkey = o_custkey
- and l_orderkey = o_orderkey
- and o_orderdate >= date '1993-10-01'
- and o_orderdate < date '1994-01-01'
- and l_returnflag = 'R'
- and c_nationkey = n_nationkey
- group by
- c_custkey,
- c_name,
- c_acctbal,
- c_phone,
- n_name,
- c_address,
- c_comment
- order by
- revenue desc;",
- ),
-
- 11 => ctx.create_logical_plan(
- "select
- ps_partkey,
- sum(ps_supplycost * ps_availqty) as value
- from
- partsupp,
- supplier,
- nation
- where
- ps_suppkey = s_suppkey
- and s_nationkey = n_nationkey
- and n_name = 'GERMANY'
- group by
- ps_partkey having
- sum(ps_supplycost * ps_availqty) > (
- select
- sum(ps_supplycost * ps_availqty) * 0.0001
- from
- partsupp,
- supplier,
- nation
- where
- ps_suppkey = s_suppkey
- and s_nationkey = n_nationkey
- and n_name = 'GERMANY'
- )
- order by
- value desc;",
- ),
-
- // original
- // 12 => ctx.create_logical_plan(
- // "select
- // l_shipmode,
- // sum(case
- // when o_orderpriority = '1-URGENT'
- // or o_orderpriority = '2-HIGH'
- // then 1
- // else 0
- // end) as high_line_count,
- // sum(case
- // when o_orderpriority <> '1-URGENT'
- // and o_orderpriority <> '2-HIGH'
- // then 1
- // else 0
- // end) as low_line_count
- // from
- // orders,
- // lineitem
- // where
- // o_orderkey = l_orderkey
- // and l_shipmode in ('MAIL', 'SHIP')
- // and l_commitdate < l_receiptdate
- // and l_shipdate < l_commitdate
- // and l_receiptdate >= date '1994-01-01'
- // and l_receiptdate < date '1994-01-01' + interval '1' year
- // group by
- // l_shipmode
- // order by
- // l_shipmode;"
- // ),
- 12 => ctx.create_logical_plan(
- "select
- l_shipmode,
- sum(case
- when o_orderpriority = '1-URGENT'
- or o_orderpriority = '2-HIGH'
- then 1
- else 0
- end) as high_line_count,
- sum(case
- when o_orderpriority <> '1-URGENT'
- and o_orderpriority <> '2-HIGH'
- then 1
- else 0
- end) as low_line_count
- from
- lineitem
- join
- orders
- on
- l_orderkey = o_orderkey
- where
- l_shipmode in ('MAIL', 'SHIP')
- and l_commitdate < l_receiptdate
- and l_shipdate < l_commitdate
- and l_receiptdate >= date '1994-01-01'
- and l_receiptdate < date '1995-01-01'
- group by
- l_shipmode
- order by
- l_shipmode;",
- ),
-
- 13 => ctx.create_logical_plan(
- "select
- c_count,
- count(*) as custdist
- from
- (
- select
- c_custkey,
- count(o_orderkey)
- from
- customer left outer join orders on
- c_custkey = o_custkey
- and o_comment not like '%special%requests%'
- group by
- c_custkey
- ) as c_orders (c_custkey, c_count)
- group by
- c_count
- order by
- custdist desc,
- c_count desc;",
- ),
-
- 14 => ctx.create_logical_plan(
- "select
- 100.00 * sum(case
- when p_type like 'PROMO%'
- then l_extendedprice * (1 - l_discount)
- else 0
- end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
- from
- lineitem,
- part
- where
- l_partkey = p_partkey
- and l_shipdate >= date '1995-09-01'
- and l_shipdate < date '1995-10-01';",
- ),
-
- 15 => ctx.create_logical_plan(
- "create view revenue0 (supplier_no, total_revenue) as
- select
- l_suppkey,
- sum(l_extendedprice * (1 - l_discount))
- from
- lineitem
- where
- l_shipdate >= date '1996-01-01'
- and l_shipdate < date '1996-01-01' + interval '3' month
- group by
- l_suppkey;
-
- select
- s_suppkey,
- s_name,
- s_address,
- s_phone,
- total_revenue
- from
- supplier,
- revenue0
- where
- s_suppkey = supplier_no
- and total_revenue = (
- select
- max(total_revenue)
- from
- revenue0
- )
- order by
- s_suppkey;
-
- drop view revenue0;",
- ),
-
- 16 => ctx.create_logical_plan(
- "select
- p_brand,
- p_type,
- p_size,
- count(distinct ps_suppkey) as supplier_cnt
- from
- partsupp,
- part
- where
- p_partkey = ps_partkey
- and p_brand <> 'Brand#45'
- and p_type not like 'MEDIUM POLISHED%'
- and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
- and ps_suppkey not in (
- select
- s_suppkey
- from
- supplier
- where
- s_comment like '%Customer%Complaints%'
- )
- group by
- p_brand,
- p_type,
- p_size
- order by
- supplier_cnt desc,
- p_brand,
- p_type,
- p_size;",
- ),
-
- 17 => ctx.create_logical_plan(
- "select
- sum(l_extendedprice) / 7.0 as avg_yearly
- from
- lineitem,
- part
- where
- p_partkey = l_partkey
- and p_brand = 'Brand#23'
- and p_container = 'MED BOX'
- and l_quantity < (
- select
- 0.2 * avg(l_quantity)
- from
- lineitem
- where
- l_partkey = p_partkey
- );",
- ),
-
- 18 => ctx.create_logical_plan(
- "select
- c_name,
- c_custkey,
- o_orderkey,
- o_orderdate,
- o_totalprice,
- sum(l_quantity)
- from
- customer,
- orders,
- lineitem
- where
- o_orderkey in (
- select
- l_orderkey
- from
- lineitem
- group by
- l_orderkey having
- sum(l_quantity) > 300
- )
- and c_custkey = o_custkey
- and o_orderkey = l_orderkey
- group by
- c_name,
- c_custkey,
- o_orderkey,
- o_orderdate,
- o_totalprice
- order by
- o_totalprice desc,
- o_orderdate;",
- ),
-
- 19 => ctx.create_logical_plan(
- "select
- sum(l_extendedprice* (1 - l_discount)) as revenue
- from
- lineitem,
- part
- where
- (
- p_partkey = l_partkey
- and p_brand = 'Brand#12'
- and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM
PKG')
- and l_quantity >= 1 and l_quantity <= 1 + 10
- and p_size between 1 and 5
- and l_shipmode in ('AIR', 'AIR REG')
- and l_shipinstruct = 'DELIVER IN PERSON'
- )
- or
- (
- p_partkey = l_partkey
- and p_brand = 'Brand#23'
- and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED
PACK')
- and l_quantity >= 10 and l_quantity <= 10 + 10
- and p_size between 1 and 10
- and l_shipmode in ('AIR', 'AIR REG')
- and l_shipinstruct = 'DELIVER IN PERSON'
- )
- or
- (
- p_partkey = l_partkey
- and p_brand = 'Brand#34'
- and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG
PKG')
- and l_quantity >= 20 and l_quantity <= 20 + 10
- and p_size between 1 and 15
- and l_shipmode in ('AIR', 'AIR REG')
- and l_shipinstruct = 'DELIVER IN PERSON'
- );",
- ),
-
- 20 => ctx.create_logical_plan(
- "select
- s_name,
- s_address
- from
- supplier,
- nation
- where
- s_suppkey in (
- select
- ps_suppkey
- from
- partsupp
- where
- ps_partkey in (
- select
- p_partkey
- from
- part
- where
- p_name like 'forest%'
- )
- and ps_availqty > (
- select
- 0.5 * sum(l_quantity)
- from
- lineitem
- where
- l_partkey = ps_partkey
- and l_suppkey = ps_suppkey
- and l_shipdate >= date '1994-01-01'
- and l_shipdate < 'date 1994-01-01' + interval
'1' year
- )
- )
- and s_nationkey = n_nationkey
- and n_name = 'CANADA'
- order by
- s_name;",
- ),
-
- 21 => ctx.create_logical_plan(
- "select
- s_name,
- count(*) as numwait
- from
- supplier,
- lineitem l1,
- orders,
- nation
- where
- s_suppkey = l1.l_suppkey
- and o_orderkey = l1.l_orderkey
- and o_orderstatus = 'F'
- and l1.l_receiptdate > l1.l_commitdate
- and exists (
- select
- *
- from
- lineitem l2
- where
- l2.l_orderkey = l1.l_orderkey
- and l2.l_suppkey <> l1.l_suppkey
- )
- and not exists (
- select
- *
- from
- lineitem l3
- where
- l3.l_orderkey = l1.l_orderkey
- and l3.l_suppkey <> l1.l_suppkey
- and l3.l_receiptdate > l3.l_commitdate
- )
- and s_nationkey = n_nationkey
- and n_name = 'SAUDI ARABIA'
- group by
- s_name
- order by
- numwait desc,
- s_name;",
- ),
-
- 22 => ctx.create_logical_plan(
- "select
- cntrycode,
- count(*) as numcust,
- sum(c_acctbal) as totacctbal
- from
- (
- select
- substring(c_phone from 1 for 2) as cntrycode,
- c_acctbal
- from
- customer
- where
- substring(c_phone from 1 for 2) in
- ('13', '31', '23', '29', '30', '18', '17')
- and c_acctbal > (
- select
- avg(c_acctbal)
- from
- customer
- where
- c_acctbal > 0.00
- and substring(c_phone from 1 for 2) in
- ('13', '31', '23', '29', '30', '18', '17')
- )
- and not exists (
- select
- *
- from
- orders
- where
- o_custkey = c_custkey
- )
- ) as custsale
- group by
- cntrycode
- order by
- cntrycode;",
- ),
-
- _ => unimplemented!("invalid query. Expected value between 1 and 22"),
+async fn benchmark_ballista(opt: BenchmarkOpt) -> Result<()> {
+ println!("Running benchmarks with the following options: {:?}", opt);
+
+ let mut settings = HashMap::new();
+ settings.insert("batch.size".to_owned(), format!("{}", opt.batch_size));
+
+ let ctx =
+ BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(),
settings);
+
+ // register tables with Ballista context
+ let path = opt.path.to_str().unwrap();
+ let file_format = opt.file_format.as_str();
+ for table in TABLES {
+ match file_format {
+ // dbgen creates .tbl ('|' delimited) files without header
+ "tbl" => {
+ let path = format!("{}/{}.tbl", path, table);
+ let schema = get_schema(table);
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .delimiter(b'|')
+ .has_header(false)
+ .file_extension(".tbl");
+ ctx.register_csv(table, &path, options)
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ }
+ "csv" => {
+ let path = format!("{}/{}", path, table);
+ let schema = get_schema(table);
+ let options =
CsvReadOptions::new().schema(&schema).has_header(true);
+ ctx.register_csv(table, &path, options)
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ }
+ "parquet" => {
+ let path = format!("{}/{}", path, table);
+ ctx.register_parquet(table, &path)
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ }
+ other => {
+ unimplemented!("Invalid file format '{}'", other);
+ }
+ }
}
+
+ let mut millis = vec![];
+
+ // run benchmark
+ let sql = get_query_sql(opt.query)?;
+ println!("Running benchmark with query {}:\n {}", opt.query, sql);
+ for i in 0..opt.iterations {
+ let start = Instant::now();
+ let df = ctx
+ .sql(&sql)
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ let mut batches = vec![];
+ let mut stream = df
+ .collect()
+ .await
+ .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
+ while let Some(result) = stream.next().await {
+ let batch = result?;
+ batches.push(batch);
+ }
+ let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+ millis.push(elapsed as f64);
+ println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
+ if opt.debug {
+ pretty::print_batches(&batches)?;
+ }
+ }
+
+ let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+ println!("Query {} avg time: {:.2} ms", opt.query, avg);
+
+ Ok(())
+}
+
+fn get_query_sql(query: usize) -> Result<String> {
+ if query > 0 && query < 23 {
+ let filename = format!("queries/q{}.sql", query);
+ Ok(fs::read_to_string(&filename).expect("failed to read query"))
+ } else {
+ Err(DataFusionError::Plan(
+ "invalid query. Expected value between 1 and 22".to_owned(),
+ ))
+ }
+}
+
+fn create_logical_plan(ctx: &mut ExecutionContext, query: usize) ->
Result<LogicalPlan> {
+ let sql = get_query_sql(query)?;
+ ctx.create_logical_plan(&sql)
}
async fn execute_query(
@@ -1668,8 +954,10 @@ mod tests {
file_format: "tbl".to_string(),
mem_table: false,
partitions: 16,
+ host: None,
+ port: None,
};
- let actual = benchmark(opt).await?;
+ let actual = benchmark_datafusion(opt).await?;
// assert schema equality without comparing nullable values
assert_eq!(
diff --git a/ballista/rust/benchmarks/tpch/tpch-gen.sh b/benchmarks/tpch-gen.sh
similarity index 97%
rename from ballista/rust/benchmarks/tpch/tpch-gen.sh
rename to benchmarks/tpch-gen.sh
index f5147f5..fef3480 100755
--- a/ballista/rust/benchmarks/tpch/tpch-gen.sh
+++ b/benchmarks/tpch-gen.sh
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-BALLISTA_VERSION=0.4.2-SNAPSHOT
+BALLISTA_VERSION=0.5.0-SNAPSHOT
#set -e
diff --git a/ballista/rust/benchmarks/tpch/tpchgen.dockerfile
b/benchmarks/tpchgen.dockerfile
similarity index 100%
rename from ballista/rust/benchmarks/tpch/tpchgen.dockerfile
rename to benchmarks/tpchgen.dockerfile
diff --git a/dev/build-rust-base.sh b/dev/build-rust-base.sh
index e424909..1bedbd8 100755
--- a/dev/build-rust-base.sh
+++ b/dev/build-rust-base.sh
@@ -16,6 +16,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-BALLISTA_VERSION=0.4.2-SNAPSHOT
+BALLISTA_VERSION=0.5.0-SNAPSHOT
set -e
docker build -t ballistacompute/rust-base:$BALLISTA_VERSION -f
dev/docker/rust-base.dockerfile .
diff --git a/dev/build-rust.sh b/dev/build-rust.sh
index d31c524..5777d1e 100755
--- a/dev/build-rust.sh
+++ b/dev/build-rust.sh
@@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
-BALLISTA_VERSION=0.4.2-SNAPSHOT
+BALLISTA_VERSION=0.5.0-SNAPSHOT
set -e
diff --git a/dev/docker/rust.dockerfile b/dev/docker/rust.dockerfile
index 6505f3c..ba713b1 100644
--- a/dev/docker/rust.dockerfile
+++ b/dev/docker/rust.dockerfile
@@ -22,7 +22,7 @@
# as a mounted directory.
ARG RELEASE_FLAG=--release
-FROM ballistacompute/rust-base:0.4.2-SNAPSHOT AS base
+FROM ballistacompute/rust-base:0.5.0-SNAPSHOT AS base
WORKDIR /tmp/ballista
RUN apt-get -y install cmake
RUN cargo install cargo-chef
@@ -73,7 +73,7 @@ ENV RELEASE_FLAG=${RELEASE_FLAG}
RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch;
else mv /tmp/ballista/target/release/tpch /tpch; fi
# Copy the binary into a new container for a smaller docker image
-FROM ballistacompute/rust-base:0.4.0-20210213
+FROM ballistacompute/rust-base:0.5.0-SNAPSHOT
COPY --from=builder /executor /
@@ -81,6 +81,10 @@ COPY --from=builder /scheduler /
COPY --from=builder /tpch /
+ADD benchmarks/run.sh /
+RUN mkdir /queries
+COPY benchmarks/queries/ /queries/
+
ENV RUST_LOG=info
ENV RUST_BACKTRACE=full
diff --git a/dev/integration-tests.sh b/dev/integration-tests.sh
index 6ed764e..06ab108 100755
--- a/dev/integration-tests.sh
+++ b/dev/integration-tests.sh
@@ -19,11 +19,11 @@
set -e
./dev/build-rust-base.sh
./dev/build-rust.sh
-pushd ballista/rust/benchmarks/tpch
+pushd benchmarks
./tpch-gen.sh
docker-compose up -d
-docker-compose run ballista-client ./run.sh
+docker-compose run ballista-client /run.sh
docker-compose down
popd
diff --git a/dev/release/rat_exclude_files.txt
b/dev/release/rat_exclude_files.txt
index f9eca7a..cef0a91 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -100,6 +100,6 @@ requirements.txt
*.scss
.gitattributes
rust-toolchain
-ballista/rust/benchmarks/tpch/queries/q*.sql
+benchmarks/queries/q*.sql
ballista/rust/scheduler/testdata/*
ballista/ui/scheduler/yarn.lock