This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 11b7b5c215 Add ClickBench queries to DataFusion benchmark runner
(#7060)
11b7b5c215 is described below
commit 11b7b5c215012231e5768fc5be3445c0254d0169
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jul 27 06:30:17 2023 -0500
Add ClickBench queries to DataFusion benchmark runner (#7060)
* Add clickbench query runner to benchmarks, update docs
* Fix numbering so it goes from 0 to 42
---
benchmarks/README.md | 91 +++++++++++--------
benchmarks/bench.sh | 12 ++-
benchmarks/queries/clickbench/README.txt | 1 +
benchmarks/queries/clickbench/queries.sql | 43 +++++++++
benchmarks/src/bin/dfbench.rs | 4 +-
benchmarks/src/bin/tpch.rs | 5 +-
benchmarks/src/clickbench.rs | 141 ++++++++++++++++++++++++++++++
benchmarks/src/lib.rs | 25 +-----
benchmarks/src/options.rs | 53 +++++++++++
benchmarks/src/tpch/convert.rs | 2 +-
benchmarks/src/tpch/run.rs | 67 ++++++++------
11 files changed, 354 insertions(+), 90 deletions(-)
diff --git a/benchmarks/README.md b/benchmarks/README.md
index cf8a20a823..b182311977 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -20,11 +20,14 @@
# DataFusion Benchmarks
This crate contains benchmarks based on popular public data sets and
-open source benchmark suites, making it easy to run more realistic
-benchmarks to help with performance and scalability testing of DataFusion.
+open source benchmark suites, to help with performance and scalability
+testing of DataFusion.
-# Benchmarks Against Other Engines
+## Other engines
+
+The benchmarks measure changes to DataFusion itself, rather than
+its performance against other engines. For competitive benchmarking,
DataFusion is included in the benchmark setups for several popular
benchmarks that compare performance with other engines. For example:
@@ -36,30 +39,35 @@ benchmarks that compare performance with other engines. For
example:
# Running the benchmarks
-## Running Benchmarks
+## `bench.sh`
-The easiest way to run benchmarks from DataFusion source checkouts is
-to use the [bench.sh](bench.sh) script. Usage instructions can be
-found with:
+The easiest way to run benchmarks is the [bench.sh](bench.sh)
+script. Usage instructions can be found with:
```shell
# show usage
./bench.sh
```
-## Generating Data
+## Generating data
+
+You can create / download the data for these benchmarks using the
[bench.sh](bench.sh) script:
-You can create data for all these benchmarks using the [bench.sh](bench.sh)
script:
+Create / download all datasets
```shell
./bench.sh data
```
-Data is generated in the `data` subdirectory and will not be checked
-in because this directory has been added to the `.gitignore` file.
+Create / download a specific dataset (TPCH)
+```shell
+./bench.sh data tpch
+```
-## Example to compare peformance on main to a branch
+Data is placed in the `data` subdirectory.
+
+## Comparing performance of main and a branch
```shell
git checkout main
@@ -143,27 +151,17 @@ Benchmark tpch_mem.json
```
-# Benchmark Descriptions:
-
-## `tpch` Benchmark derived from TPC-H
-
-These benchmarks are derived from the [TPC-H][1] benchmark. And we use this
repo as the source of tpch-gen and answers:
-https://github.com/databricks/tpch-dbgen.git, based on
[2.17.1](https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf)
version of TPC-H.
+### Running Benchmarks Manually
-
-### Running the DataFusion Benchmarks Manually
-
-The benchmark can then be run (assuming the data created from `dbgen` is in
`./data`) with a command such as:
+Assuming data in the `data` directory, the `tpch` benchmark can be run with a
command like this
```bash
-cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path
./data --format tbl --query 1 --batch-size 4096
+cargo run --release --bin dfbench -- tpch --iterations 3 --path ./data
--format tbl --query 1 --batch-size 4096
```
-If you omit `--query=<query_id>` argument, then all benchmarks will be run one
by one (from query 1 to query 22).
+See the help for more details
-```bash
-cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path
./data --format tbl --batch-size 4096
-```
+### Different features
You can enable the features `simd` (to use SIMD instructions, `cargo nightly`
is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or
snmalloc allocator) as features by passing them in as `--features`:
@@ -171,12 +169,6 @@ You can enable the features `simd` (to use SIMD
instructions, `cargo nightly` is
cargo run --release --features "simd mimalloc" --bin tpch -- benchmark
datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```
-If you want to disable collection of statistics (and thus cost based
optimizers), you can pass `--disable-statistics` flag.
-
-```bash
-cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path
/mnt/tpch-parquet --format parquet --query 17 --disable-statistics
-```
-
The benchmark program also supports CSV and Parquet input file formats and a
utility is provided to convert from `tbl`
(generated by the `dbgen` utility) to CSV and Parquet.
@@ -188,9 +180,10 @@ Or if you want to verify and run all the queries in the
benchmark, you can just
### Comparing results between runs
-Any `tpch` execution with `-o <dir>` argument will produce a summary file
right under the `<dir>`
-directory. It is a JSON serialized form of all the runs that happened as well
as the runtime metadata
-(number of cores, DataFusion version, etc.).
+Any `dfbench` execution with `-o <dir>` argument will produce a
+summary JSON in the specified directory. This file contains a
+serialized form of all the runs that happened and runtime
+metadata (number of cores, DataFusion version, etc.).
```shell
$ git checkout main
@@ -253,6 +246,32 @@ Query 1 iteration 0 took 1956.1 ms
Query 1 avg time: 1956.11 ms
```
+# Benchmark Descriptions
+
+## `dfbench`
+
+The `dfbench` program contains subcommands to run various benchmarks.
+
+Full help can be found in the relevant sub command. For example to get help
for tpch,
+run `cargo run --release --bin dfbench tpch --help`
+
+```shell
+cargo run --release --bin dfbench --help
+...
+datafusion-benchmarks 27.0.0
+benchmark command
+
+USAGE:
+ dfbench <SUBCOMMAND>
+
+SUBCOMMANDS:
+ clickbench Run the clickbench benchmark
+ help Prints this message or the help of the given subcommand(s)
+ tpch Run the tpch benchmark.
+ tpch-convert Convert tpch .slt files to .parquet or .csv files
+
+```
+
## NYC Taxi Benchmark
These benchmarks are based on the [New York Taxi and Limousine Commission][2]
data set.
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index f71094a425..ca58e49f60 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -35,7 +35,7 @@ BENCHMARK=all
DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
#CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
-CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # TEMP:
for faster iterations
+CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for
faster iterations
usage() {
echo "
@@ -386,12 +386,18 @@ data_clickbench_partitioned() {
# Runs the clickbench benchmark with a single large parquet file
run_clickbench_1() {
- echo "NOTICE: ClickBench (1 parquet file) is not yet supported"
+ RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
+ echo "RESULTS_FILE: ${RESULTS_FILE}"
+ echo "Running clickbench (1 file) benchmark..."
+ $CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path
"${DATA_DIR}/hits.parquet" --queries-path
"${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}
# Runs the clickbench benchmark with a single large parquet file
run_clickbench_partitioned() {
- echo "NOTICE: ClickBench (1 parquet file) is not yet supported"
+ RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
+ echo "RESULTS_FILE: ${RESULTS_FILE}"
+ echo "Running clickbench (partitioned, 100 files) benchmark..."
+ $CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path
"${DATA_DIR}/hits_partitioned" --queries-path
"${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}
compare_benchmarks() {
diff --git a/benchmarks/queries/clickbench/README.txt
b/benchmarks/queries/clickbench/README.txt
new file mode 100644
index 0000000000..b46900956e
--- /dev/null
+++ b/benchmarks/queries/clickbench/README.txt
@@ -0,0 +1 @@
+Downloaded from
https://github.com/ClickHouse/ClickBench/blob/main/datafusion/queries.sql
diff --git a/benchmarks/queries/clickbench/queries.sql
b/benchmarks/queries/clickbench/queries.sql
new file mode 100644
index 0000000000..52e72e02e1
--- /dev/null
+++ b/benchmarks/queries/clickbench/queries.sql
@@ -0,0 +1,43 @@
+SELECT COUNT(*) FROM hits;
+SELECT COUNT(*) FROM hits WHERE "AdvEngineID" <> 0;
+SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits;
+SELECT AVG("UserID") FROM hits;
+SELECT COUNT(DISTINCT "UserID") FROM hits;
+SELECT COUNT(DISTINCT "SearchPhrase") FROM hits;
+SELECT MIN("EventDate"::INT::DATE), MAX("EventDate"::INT::DATE) FROM hits;
+SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY
"AdvEngineID" ORDER BY COUNT(*) DESC;
+SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID"
ORDER BY u DESC LIMIT 10;
+SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"),
COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10;
+SELECT "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE
"MobilePhoneModel" <> '' GROUP BY "MobilePhoneModel" ORDER BY u DESC LIMIT 10;
+SELECT "MobilePhone", "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM
hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhone", "MobilePhoneModel"
ORDER BY u DESC LIMIT 10;
+SELECT "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> ''
GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
+SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE
"SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;
+SELECT "SearchEngineID", "SearchPhrase", COUNT(*) AS c FROM hits WHERE
"SearchPhrase" <> '' GROUP BY "SearchEngineID", "SearchPhrase" ORDER BY c DESC
LIMIT 10;
+SELECT "UserID", COUNT(*) FROM hits GROUP BY "UserID" ORDER BY COUNT(*) DESC
LIMIT 10;
+SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID",
"SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
+SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID",
"SearchPhrase" LIMIT 10;
+SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m,
"SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER
BY COUNT(*) DESC LIMIT 10;
+SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449;
+SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';
+SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE
'%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC
LIMIT 10;
+SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT
"UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE
'%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC
LIMIT 10;
+SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY
to_timestamp_seconds("EventTime") LIMIT 10;
+SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY
to_timestamp_seconds("EventTime") LIMIT 10;
+SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY
"SearchPhrase" LIMIT 10;
+SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY
to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10;
+SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE
"URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT
25;
+SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS
k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE
"Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
+SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1),
SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" +
4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6),
SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" +
9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11),
SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth"
+ 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("R [...]
+SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"),
AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY
"SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10;
+SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"),
AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID",
"ClientIP" ORDER BY c DESC LIMIT 10;
+SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"),
AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC
LIMIT 10;
+SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
+SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC
LIMIT 10;
+SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS
c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3
ORDER BY c DESC LIMIT 10;
+SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <=
'2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP
BY "URL" ORDER BY PageViews DESC LIMIT 10;
+SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <=
'2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> ''
GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
+SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <=
'2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP
BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
+SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN
("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src,
"URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <=
'2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID",
"AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
+SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits
WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND
"EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID"
IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash",
"EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
+SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM
hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND
"EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits"
= 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth",
"WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
+SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*)
AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >=
'2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND
"DontCountHits" = 0 GROUP BY DATE_TRUNC('minute',
to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10
OFFSET 1000;
diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
index f4ba8bc975..d5a17a2ab5 100644
--- a/benchmarks/src/bin/dfbench.rs
+++ b/benchmarks/src/bin/dfbench.rs
@@ -28,13 +28,14 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
-use datafusion_benchmarks::tpch;
+use datafusion_benchmarks::{clickbench, tpch};
#[derive(Debug, StructOpt)]
#[structopt(about = "benchmark command")]
enum Options {
Tpch(tpch::RunOpt),
TpchConvert(tpch::ConvertOpt),
+ Clickbench(clickbench::RunOpt),
}
// Main benchmark runner entrypoint
@@ -45,5 +46,6 @@ pub async fn main() -> Result<()> {
match Options::from_args() {
Options::Tpch(opt) => opt.run().await,
Options::TpchConvert(opt) => opt.run().await,
+ Options::Clickbench(opt) => opt.run().await,
}
}
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 757136d231..9548093570 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -43,9 +43,10 @@ enum TpchOpt {
Convert(tpch::ConvertOpt),
}
-/// 'tpch' entry point, with tortured command line arguments
+/// 'tpch' entry point, with tortured command line arguments. Please
+/// use `dbbench` instead.
///
-/// This is kept to be backwards compatible with the benchmark names prior to
+/// Note: this is kept to be backwards compatible with the benchmark names
prior to
/// <https://github.com/apache/arrow-datafusion/issues/6994>
#[tokio::main]
async fn main() -> Result<()> {
diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs
new file mode 100644
index 0000000000..c884af8094
--- /dev/null
+++ b/benchmarks/src/clickbench.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{path::PathBuf, time::Instant};
+
+use datafusion::{
+ error::{DataFusionError, Result},
+ prelude::SessionContext,
+};
+use structopt::StructOpt;
+
+use crate::{BenchmarkRun, CommonOpt};
+
+/// Run the clickbench benchmark
+///
+/// The ClickBench[1] benchmarks are widely cited in the industry and
+/// focus on grouping / aggregation / filtering. This runner uses the
+/// scripts and queries from [2].
+///
+/// [1]: https://github.com/ClickHouse/ClickBench
+/// [2]: https://github.com/ClickHouse/ClickBench/tree/main/datafusion
+#[derive(Debug, StructOpt, Clone)]
+#[structopt(verbatim_doc_comment)]
+pub struct RunOpt {
+ /// Query number (between 0 and 42). If not specified, runs all queries
+ #[structopt(short, long)]
+ query: Option<usize>,
+
+ /// Common options
+ #[structopt(flatten)]
+ common: CommonOpt,
+
+ /// Path to hits.parquet (single file) or `hits_partitioned`
+ /// (partitioned, 100 files)
+ #[structopt(
+ parse(from_os_str),
+ short = "p",
+ long = "path",
+ default_value = "benchmarks/data/hits.parquet"
+ )]
+ path: PathBuf,
+
+ /// Path to queries.sql (single file)
+ #[structopt(
+ parse(from_os_str),
+ short = "r",
+ long = "queries-path",
+ default_value = "benchmarks/queries/clickbench/queries.sql"
+ )]
+ queries_path: PathBuf,
+
+ /// If present, write results json here
+ #[structopt(parse(from_os_str), short = "o", long = "output")]
+ output_path: Option<PathBuf>,
+}
+
+const CLICKBENCH_QUERY_START_ID: usize = 0;
+const CLICKBENCH_QUERY_END_ID: usize = 42;
+
+impl RunOpt {
+ pub async fn run(self) -> Result<()> {
+ println!("Running benchmarks with the following options: {self:?}");
+ let query_range = match self.query {
+ Some(query_id) => query_id..=query_id,
+ None => CLICKBENCH_QUERY_START_ID..=CLICKBENCH_QUERY_END_ID,
+ };
+
+ let config = self.common.config();
+ let ctx = SessionContext::with_config(config);
+ self.register_hits(&ctx).await?;
+
+ let iterations = self.common.iterations();
+ let mut benchmark_run = BenchmarkRun::new();
+ for query_id in query_range {
+ benchmark_run.start_new_case(&format!("Query {query_id}"));
+ let sql = self.get_query(query_id)?;
+ println!("Q{query_id}: {sql}");
+
+ for i in 0..iterations {
+ let start = Instant::now();
+ let results = ctx.sql(&sql).await?.collect().await?;
+ let elapsed = start.elapsed();
+ let ms = elapsed.as_secs_f64() * 1000.0;
+ let row_count: usize = results.iter().map(|b|
b.num_rows()).sum();
+ println!(
+ "Query {query_id} iteration {i} took {ms:.1} ms and
returned {row_count} rows"
+ );
+ benchmark_run.write_iter(elapsed, row_count);
+ }
+ }
+ benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+ Ok(())
+ }
+
+ /// Registrs the `hits.parquet` as a table named `hits`
+ async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
+ let options = Default::default();
+ let path = self.path.as_os_str().to_str().unwrap();
+ ctx.register_parquet("hits", path, options)
+ .await
+ .map_err(|e| {
+ DataFusionError::Context(
+ format!("Registering 'hits' as {path}"),
+ Box::new(e),
+ )
+ })
+ }
+
+ /// Returns the text of query `query_id`
+ fn get_query(&self, query_id: usize) -> Result<String> {
+ if query_id > CLICKBENCH_QUERY_END_ID {
+ return Err(DataFusionError::Execution(format!(
+ "Invalid query id {query_id}. Must be between
{CLICKBENCH_QUERY_START_ID} and {CLICKBENCH_QUERY_END_ID}"
+ )));
+ }
+
+ let path = self.queries_path.as_path();
+
+ // ClickBench has all queries in a single file identified by line
number
+ let all_queries = std::fs::read_to_string(path).map_err(|e| {
+ DataFusionError::Execution(format!("Could not open {path:?}: {e}"))
+ })?;
+ let all_queries: Vec<_> = all_queries.lines().collect();
+
+ Ok(all_queries.get(query_id).map(|s| s.to_string()).unwrap())
+ }
+}
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs
index 9d5530d31f..8cab151115 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/lib.rs
@@ -16,27 +16,10 @@
// under the License.
//! DataFusion benchmark runner
-use datafusion::error::Result;
-use structopt::StructOpt;
-
-pub mod run;
+pub mod clickbench;
pub mod tpch;
+mod options;
+mod run;
+pub use options::CommonOpt;
pub use run::{BenchQuery, BenchmarkRun};
-
-#[derive(Debug, StructOpt)]
-#[structopt(about = "benchmark command")]
-enum Options {
- Tpch(tpch::RunOpt),
- TpchConvert(tpch::ConvertOpt),
-}
-
-// Main benchmark runner entrypoint
-pub async fn main() -> Result<()> {
- env_logger::init();
-
- match Options::from_args() {
- Options::Tpch(opt) => opt.run().await,
- Options::TpchConvert(opt) => opt.run().await,
- }
-}
diff --git a/benchmarks/src/options.rs b/benchmarks/src/options.rs
new file mode 100644
index 0000000000..d285741e23
--- /dev/null
+++ b/benchmarks/src/options.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::prelude::SessionConfig;
+use structopt::StructOpt;
+
+// Common benchmark options (don't use doc comments otherwise this doc
+// shows up in help files)
+#[derive(Debug, StructOpt, Clone)]
+pub struct CommonOpt {
+ /// Number of iterations of each test run
+ #[structopt(short = "i", long = "iterations", default_value = "3")]
+ pub iterations: usize,
+
+ /// Number of partitions to process in parallel
+ #[structopt(short = "n", long = "partitions", default_value = "2")]
+ pub partitions: usize,
+
+ /// Batch size when reading CSV or Parquet files
+ #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+ pub batch_size: usize,
+}
+
+impl CommonOpt {
+ /// Return an appropriately configured `SessionConfig`
+ pub fn config(&self) -> SessionConfig {
+ SessionConfig::new()
+ .with_target_partitions(self.partitions)
+ .with_batch_size(self.batch_size)
+ }
+
+ pub fn iterations(&self) -> usize {
+ self.iterations
+ }
+
+ pub fn partitions(&self) -> usize {
+ self.partitions
+ }
+}
diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs
index c65ced4c7b..f5fcbb0547 100644
--- a/benchmarks/src/tpch/convert.rs
+++ b/benchmarks/src/tpch/convert.rs
@@ -58,7 +58,7 @@ pub struct ConvertOpt {
}
impl ConvertOpt {
- pub async fn run(&self) -> Result<()> {
+ pub async fn run(self) -> Result<()> {
let compression = self.compression()?;
let input_path = self.input_path.to_str().unwrap();
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index 6aada30bc7..5b43f2b93f 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -16,7 +16,7 @@
// under the License.
use super::get_query_sql;
-use crate::BenchmarkRun;
+use crate::{BenchmarkRun, CommonOpt};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::{self, pretty_format_batches};
use datafusion::datasource::file_format::csv::{CsvFormat,
DEFAULT_CSV_EXTENSION};
@@ -42,8 +42,17 @@ use structopt::StructOpt;
use super::{get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES};
-/// Run the tpch benchmark
+/// Run the tpch benchmark.
+///
+/// This benchmarks is derived from the [TPC-H][1] version
+/// [2.17.1]. The data and answers are generated using `tpch-gen` from
+/// [2].
+///
+/// [1]: http://www.tpc.org/tpch/
+/// [2]: https://github.com/databricks/tpch-dbgen.git,
+/// [2.17.1]:
https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf
#[derive(Debug, StructOpt, Clone)]
+#[structopt(verbatim_doc_comment)]
pub struct RunOpt {
/// Query number. If not specified, runs all queries
#[structopt(short, long)]
@@ -53,17 +62,9 @@ pub struct RunOpt {
#[structopt(short, long)]
debug: bool,
- /// Number of iterations of each test run
- #[structopt(short = "i", long = "iterations", default_value = "3")]
- iterations: usize,
-
- /// Number of partitions to process in parallel
- #[structopt(short = "n", long = "partitions", default_value = "2")]
- partitions: usize,
-
- /// Batch size when reading CSV or Parquet files
- #[structopt(short = "s", long = "batch-size", default_value = "8192")]
- batch_size: usize,
+ /// Common options
+ #[structopt(flatten)]
+ common: CommonOpt,
/// Path to data files
#[structopt(parse(from_os_str), required = true, short = "p", long =
"path")]
@@ -90,7 +91,7 @@ const TPCH_QUERY_START_ID: usize = 1;
const TPCH_QUERY_END_ID: usize = 22;
impl RunOpt {
- pub async fn run(&self) -> Result<()> {
+ pub async fn run(self) -> Result<()> {
println!("Running benchmarks with the following options: {self:?}");
let query_range = match self.query {
Some(query_id) => query_id..=query_id,
@@ -110,9 +111,9 @@ impl RunOpt {
}
async fn benchmark_query(&self, query_id: usize) ->
Result<Vec<QueryResult>> {
- let config = SessionConfig::new()
- .with_target_partitions(self.partitions)
- .with_batch_size(self.batch_size)
+ let config = self
+ .common
+ .config()
.with_collect_statistics(!self.disable_statistics);
let ctx = SessionContext::with_config(config);
@@ -122,7 +123,7 @@ impl RunOpt {
let mut millis = vec![];
// run benchmark
let mut query_results = vec![];
- for i in 0..self.iterations {
+ for i in 0..self.iterations() {
let start = Instant::now();
let sql = &get_query_sql(query_id)?;
@@ -169,7 +170,7 @@ impl RunOpt {
println!("Loading table '{table}' into memory");
let start = Instant::now();
let memtable =
- MemTable::load(table_provider, Some(self.partitions),
&ctx.state())
+ MemTable::load(table_provider, Some(self.partitions()),
&ctx.state())
.await?;
println!(
"Loaded table '{}' into memory in {} ms",
@@ -231,7 +232,7 @@ impl RunOpt {
) -> Result<Arc<dyn TableProvider>> {
let path = self.path.to_str().unwrap();
let table_format = self.file_format.as_str();
- let target_partitions = self.partitions;
+ let target_partitions = self.partitions();
// Obtain a snapshot of the SessionState
let state = ctx.state();
@@ -283,6 +284,14 @@ impl RunOpt {
Ok(Arc::new(ListingTable::try_new(config)?))
}
+
+ fn iterations(&self) -> usize {
+ self.common.iterations()
+ }
+
+ fn partitions(&self) -> usize {
+ self.common.partitions()
+ }
}
struct QueryResult {
@@ -318,12 +327,15 @@ mod tests {
async fn round_trip_logical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
- let opt = RunOpt {
- query: Some(query),
- debug: false,
+ let common = CommonOpt {
iterations: 1,
partitions: 2,
batch_size: 8192,
+ };
+ let opt = RunOpt {
+ query: Some(query),
+ debug: false,
+ common,
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,
@@ -347,12 +359,15 @@ mod tests {
async fn round_trip_physical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
- let opt = RunOpt {
- query: Some(query),
- debug: false,
+ let common = CommonOpt {
iterations: 1,
partitions: 2,
batch_size: 8192,
+ };
+ let opt = RunOpt {
+ query: Some(query),
+ debug: false,
+ common,
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,