This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 226afc679e Add H2O.ai Database-like Ops benchmark to dfbench (groupby support) (#13996) 226afc679e is described below commit 226afc679e80fbb7d4c2ccc441aac88e02f89da1 Author: Qi Zhu <821684...@qq.com> AuthorDate: Sun Jan 12 19:02:00 2025 +0800 Add H2O.ai Database-like Ops benchmark to dfbench (groupby support) (#13996) * Add H2O.ai Database-like Ops benchmark to dfbench * Fix query and fmt * Change venv * Make sure venv version support falsa * Fix default path * Support groupby only now * fix * Address comments * fix * support python version higher * support higer python such as python 3.13 * Addressed new comments * Add specific query example --- benchmarks/README.md | 55 ++++++++---- benchmarks/bench.sh | 146 +++++++++++++++++++++++++++++++ benchmarks/queries/h2o/groupby.sql | 10 +++ benchmarks/queries/h2o/join.sql | 5 ++ benchmarks/src/bin/dfbench.rs | 6 +- benchmarks/src/bin/h2o.rs | 135 ---------------------------- benchmarks/src/h2o.rs | 175 +++++++++++++++++++++++++++++++++++++ benchmarks/src/lib.rs | 1 + 8 files changed, 379 insertions(+), 154 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index cccd7f44f5..332cac8459 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -32,7 +32,7 @@ DataFusion is included in the benchmark setups for several popular benchmarks that compare performance with other engines. For example: * [ClickBench] scripts are in the [ClickBench repo](https://github.com/ClickHouse/ClickBench/tree/main/datafusion) -* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](db-benchmark) directory +* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](https://github.com/apache/datafusion/tree/main/benchmarks/src/h2o.rs) [ClickBench]: https://github.com/ClickHouse/ClickBench/tree/main [H2o.ai `db-benchmark`]: https://github.com/h2oai/db-benchmark @@ -405,31 +405,50 @@ cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '... ``` -# Older Benchmarks +## h2o benchmarks for groupby -## h2o benchmarks +### Generate data for h2o benchmarks +There are three options for generating data for h2o benchmarks: `small`, `medium`, and `big`. The data is generated in the `data` directory. +1. Generate small data (1e7 rows) ```bash -cargo run --release --bin h2o group-by --query 1 --path /mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv --mem-table --debug +./bench.sh data h2o_small ``` -Example run: +2. Generate medium data (1e8 rows) +```bash +./bench.sh data h2o_medium +``` + + +3. Generate large data (1e9 rows) +```bash +./bench.sh data h2o_big +``` + +### Run h2o benchmarks +There are three options for running h2o benchmarks: `small`, `medium`, and `big`. +1. Run small data benchmark +```bash +./bench.sh run h2o_small ``` -Running benchmarks with the following options: GroupBy(GroupBy { query: 1, path: "/mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv", debug: false }) -Executing select id1, sum(v1) as v1 from x group by id1 -+-------+--------+ -| id1 | v1 | -+-------+--------+ -| id063 | 199420 | -| id094 | 200127 | -| id044 | 198886 | -... -| id093 | 200132 | -| id003 | 199047 | -+-------+--------+ -h2o groupby query 1 took 1669 ms +2. Run medium data benchmark +```bash +./bench.sh run h2o_medium +``` + +3. Run large data benchmark +```bash +./bench.sh run h2o_big +``` + +4. Run a specific query with a specific data path + +For example, to run query 1 with the small data generated above: +```bash +cargo run --release --bin dfbench -- h2o --path ./benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --query 1 ``` [1]: http://www.tpc.org/tpch/ diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index bc44e24dfe..20cb32722c 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -80,6 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) external_aggr: External aggregation benchmark +h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is csv +h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is csv +h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is csv ********** * Supported Configuration (Environment Variables) @@ -142,6 +145,9 @@ main() { all) data_tpch "1" data_tpch "10" + data_h2o "SMALL" + data_h2o "MEDIUM" + data_h2o "BIG" data_clickbench_1 data_clickbench_partitioned data_imdb @@ -172,6 +178,15 @@ main() { imdb) data_imdb ;; + h2o_small) + data_h2o "SMALL" "CSV" + ;; + h2o_medium) + data_h2o "MEDIUM" "CSV" + ;; + h2o_big) + data_h2o "BIG" "CSV" + ;; external_aggr) # same data as for tpch data_tpch "1" @@ -221,6 +236,9 @@ main() { run_clickbench_1 run_clickbench_partitioned run_clickbench_extended + run_h2o "SMALL" "PARQUET" "groupby" + run_h2o "MEDIUM" "PARQUET" "groupby" + run_h2o "BIG" "PARQUET" "groupby" run_imdb run_external_aggr ;; @@ -254,6 +272,15 @@ main() { imdb) run_imdb ;; + h2o_small) + run_h2o "SMALL" "CSV" "groupby" + ;; + h2o_medium) + run_h2o "MEDIUM" "CSV" "groupby" + ;; + h2o_big) + run_h2o "BIG" "CSV" "groupby" + ;; external_aggr) run_external_aggr ;; @@ -541,6 +568,125 @@ run_imdb() { $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" } +data_h2o() { + # Default values for size and data format + SIZE=${1:-"SMALL"} + DATA_FORMAT=${2:-"CSV"} + + # Function to compare Python versions + version_ge() { + [ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ] + } + + export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1 + + # Find the highest available Python version (3.10 or higher) + REQUIRED_VERSION="3.10" + PYTHON_CMD=$(command -v python3 || true) + + if [ -n "$PYTHON_CMD" ]; then + PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") + if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then + echo "Found Python version $PYTHON_VERSION, which is suitable." + else + echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required." + PYTHON_CMD="" + fi + fi + + # Search for suitable Python versions if the default is unsuitable + if [ -z "$PYTHON_CMD" ]; then + # Loop through all available Python3 commands on the system + for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do + if command -v "$CMD" &> /dev/null; then + PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')") + if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then + PYTHON_CMD="$CMD" + echo "Found suitable Python version: $PYTHON_VERSION ($CMD)" + break + fi + fi + done + fi + + # If no suitable Python version found, exit with an error + if [ -z "$PYTHON_CMD" ]; then + echo "Python 3.10 or higher is required. Please install it." + return 1 + fi + + echo "Using Python command: $PYTHON_CMD" + + # Install falsa and other dependencies + echo "Installing falsa..." + + # Set virtual environment directory + VIRTUAL_ENV="${PWD}/venv" + + # Create a virtual environment using the detected Python command + $PYTHON_CMD -m venv "$VIRTUAL_ENV" + + # Activate the virtual environment and install dependencies + source "$VIRTUAL_ENV/bin/activate" + + # Ensure 'falsa' is installed (avoid unnecessary reinstall) + pip install --quiet --upgrade falsa + + # Create directory if it doesn't exist + H2O_DIR="${DATA_DIR}/h2o" + mkdir -p "${H2O_DIR}" + + # Generate h2o test data + echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}" + falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}" + + # Deactivate virtual environment after completion + deactivate +} + +## todo now only support groupby, after https://github.com/mrpowers-io/falsa/issues/21 done, we can add support for join +run_h2o() { + # Default values for size and data format + SIZE=${1:-"SMALL"} + DATA_FORMAT=${2:-"CSV"} + DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]') + RUN_Type=${3:-"groupby"} + + # Data directory and results file path + H2O_DIR="${DATA_DIR}/h2o" + RESULTS_FILE="${RESULTS_DIR}/h2o.json" + + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running h2o benchmark..." + + # Set the file name based on the size + case "$SIZE" in + "SMALL") + FILE_NAME="G1_1e7_1e7_100_0.${DATA_FORMAT}" # For small dataset + ;; + "MEDIUM") + FILE_NAME="G1_1e8_1e8_100_0.${DATA_FORMAT}" # For medium dataset + ;; + "BIG") + FILE_NAME="G1_1e9_1e9_100_0.${DATA_FORMAT}" # For big dataset + ;; + *) + echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG." + return 1 + ;; + esac + + # Set the query file name based on the RUN_Type + QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql" + + # Run the benchmark using the dynamically constructed file path and query file + $CARGO_COMMAND --bin dfbench -- h2o \ + --iterations 3 \ + --path "${H2O_DIR}/${FILE_NAME}" \ + --queries-path "${QUERY_FILE}" \ + -o "${RESULTS_FILE}" +} + # Runs the external aggregation benchmark run_external_aggr() { # Use TPC-H SF1 dataset diff --git a/benchmarks/queries/h2o/groupby.sql b/benchmarks/queries/h2o/groupby.sql new file mode 100644 index 0000000000..c2101ef8ad --- /dev/null +++ b/benchmarks/queries/h2o/groupby.sql @@ -0,0 +1,10 @@ +SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1; +SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2; +SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3; +SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4; +SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6; +SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5; +SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3; +SELECT id6, largest2_v3 FROM (SELECT id6, v3 AS largest2_v3, ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 FROM x WHERE v3 IS NOT NULL) sub_query WHERE order_v3 <= 2; +SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4; +SELECT id1, id2, id3, id4, id5, id6, SUM(v3) AS v3, COUNT(*) AS count FROM x GROUP BY id1, id2, id3, id4, id5, id6; diff --git a/benchmarks/queries/h2o/join.sql b/benchmarks/queries/h2o/join.sql new file mode 100644 index 0000000000..8546b9292d --- /dev/null +++ b/benchmarks/queries/h2o/join.sql @@ -0,0 +1,5 @@ +SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1; +SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2; +SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2; +SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x JOIN medium ON x.id5 = medium.id5; +SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3; diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index 81aa5437dd..db6c29f4a4 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,9 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch}; +use datafusion_benchmarks::{ + clickbench, h2o, imdb, parquet_filter, sort, sort_tpch, tpch, +}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -45,6 +47,7 @@ enum Options { Sort(sort::RunOpt), SortTpch(sort_tpch::RunOpt), Imdb(imdb::RunOpt), + H2o(h2o::RunOpt), } // Main benchmark runner entrypoint @@ -60,5 +63,6 @@ pub async fn main() -> Result<()> { Options::Sort(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, Options::Imdb(opt) => opt.run().await, + Options::H2o(opt) => opt.run().await, } } diff --git a/benchmarks/src/bin/h2o.rs b/benchmarks/src/bin/h2o.rs deleted file mode 100644 index 328db3d85b..0000000000 --- a/benchmarks/src/bin/h2o.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! DataFusion h2o benchmarks - -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::config::ConfigOptions; -use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::listing::{ - ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, -}; -use datafusion::datasource::MemTable; -use datafusion::prelude::CsvReadOptions; -use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext}; -use datafusion_benchmarks::util::BenchmarkRun; -use datafusion_common::utils::get_available_parallelism; -use std::path::PathBuf; -use std::sync::Arc; -use structopt::StructOpt; -use tokio::time::Instant; - -#[derive(Debug, StructOpt)] -#[structopt(name = "datafusion-h2o", about = "DataFusion h2o benchmarks")] -enum Opt { - GroupBy(GroupBy), //TODO add Join queries -} - -#[derive(Debug, StructOpt)] -struct GroupBy { - /// Query number - #[structopt(short, long)] - query: usize, - /// Path to data file - #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] - path: PathBuf, - /// Activate debug mode to see query results - #[structopt(short, long)] - debug: bool, - /// Load the data into a MemTable before executing the query - #[structopt(short = "m", long = "mem-table")] - mem_table: bool, - /// Path to machine readable output file - #[structopt(parse(from_os_str), short = "o", long = "output")] - output_path: Option<PathBuf>, -} - -#[tokio::main] -async fn main() -> Result<()> { - let opt = Opt::from_args(); - println!("Running benchmarks with the following options: {opt:?}"); - match opt { - Opt::GroupBy(config) => group_by(&config).await, - } -} - -async fn group_by(opt: &GroupBy) -> Result<()> { - let mut rundata = BenchmarkRun::new(); - let path = opt.path.to_str().unwrap(); - let mut config = ConfigOptions::from_env()?; - config.execution.batch_size = 65535; - - let ctx = SessionContext::new_with_config(config.into()); - - let schema = Schema::new(vec![ - Field::new("id1", DataType::Utf8, false), - Field::new("id2", DataType::Utf8, false), - Field::new("id3", DataType::Utf8, false), - Field::new("id4", DataType::Int32, false), - Field::new("id5", DataType::Int32, false), - Field::new("id6", DataType::Int32, false), - Field::new("v1", DataType::Int32, false), - Field::new("v2", DataType::Int32, false), - Field::new("v3", DataType::Float64, false), - ]); - - if opt.mem_table { - let listing_config = ListingTableConfig::new(ListingTableUrl::parse(path)?) - .with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default()))) - .with_schema(Arc::new(schema)); - let csv = ListingTable::try_new(listing_config)?; - let partition_size = get_available_parallelism(); - let memtable = - MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?; - ctx.register_table("x", Arc::new(memtable))?; - } else { - ctx.register_csv("x", path, CsvReadOptions::default().schema(&schema)) - .await?; - } - rundata.start_new_case(&opt.query.to_string()); - let sql = match opt.query { - 1 => "select id1, sum(v1) as v1 from x group by id1", - 2 => "select id1, id2, sum(v1) as v1 from x group by id1, id2", - 3 => "select id3, sum(v1) as v1, mean(v3) as v3 from x group by id3", - 4 => "select id4, mean(v1) as v1, mean(v2) as v2, mean(v3) as v3 from x group by id4", - 5 => "select id6, sum(v1) as v1, sum(v2) as v2, sum(v3) as v3 from x group by id6", - 6 => "select id4, id5, median(v3) as median_v3, stddev(v3) as sd_v3 from x group by id4, id5", - 7 => "select id3, max(v1)-min(v2) as range_v1_v2 from x group by id3", - 8 => "select id6, largest2_v3 from (select id6, v3 as largest2_v3, row_number() over (partition by id6 order by v3 desc) as order_v3 from x where v3 is not null) sub_query where order_v3 <= 2", - 9 => "select id2, id4, pow(corr(v1, v2), 2) as r2 from x group by id2, id4", - 10 => "select id1, id2, id3, id4, id5, id6, sum(v3) as v3, count(*) as count from x group by id1, id2, id3, id4, id5, id6", - _ => unimplemented!(), - }; - - println!("Executing {sql}"); - let start = Instant::now(); - let df = ctx.sql(sql).await?; - let batches = df.collect().await?; - let elapsed = start.elapsed(); - let numrows = batches.iter().map(|b| b.num_rows()).sum::<usize>(); - if opt.debug { - pretty::print_batches(&batches)?; - } - rundata.write_iter(elapsed, numrows); - println!( - "h2o groupby query {} took {} ms", - opt.query, - elapsed.as_secs_f64() * 1000.0 - ); - rundata.maybe_write_json(opt.output_path.as_ref())?; - Ok(()) -} diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs new file mode 100644 index 0000000000..53a516ceb5 --- /dev/null +++ b/benchmarks/src/h2o.rs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::util::{BenchmarkRun, CommonOpt}; +use datafusion::{error::Result, prelude::SessionContext}; +use datafusion_common::{exec_datafusion_err, instant::Instant, DataFusionError}; +use std::path::{Path, PathBuf}; +use structopt::StructOpt; + +/// Run the H2O benchmark +#[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] +pub struct RunOpt { + #[structopt(short, long)] + query: Option<usize>, + + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// Path to queries.sql (single file) + /// default value is the groupby.sql file in the h2o benchmark + #[structopt( + parse(from_os_str), + short = "r", + long = "queries-path", + default_value = "benchmarks/queries/h2o/groupby.sql" + )] + queries_path: PathBuf, + + /// Path to data file (parquet or csv) + /// Default value is the G1_1e7_1e7_100_0.csv file in the h2o benchmark + /// This is the small csv file with 10^7 rows + #[structopt( + parse(from_os_str), + short = "p", + long = "path", + default_value = "benchmarks/data/h2o/G1_1e7_1e7_100_0.csv" + )] + path: PathBuf, + + /// If present, write results json here + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option<PathBuf>, +} + +impl RunOpt { + pub async fn run(self) -> Result<()> { + println!("Running benchmarks with the following options: {self:?}"); + let queries = AllQueries::try_new(&self.queries_path)?; + let query_range = match self.query { + Some(query_id) => query_id..=query_id, + None => queries.min_query_id()..=queries.max_query_id(), + }; + + let config = self.common.config(); + let ctx = SessionContext::new_with_config(config); + + // Register data + self.register_data(&ctx).await?; + + let iterations = self.common.iterations; + let mut benchmark_run = BenchmarkRun::new(); + for query_id in query_range { + benchmark_run.start_new_case(&format!("Query {query_id}")); + let sql = queries.get_query(query_id)?; + println!("Q{query_id}: {sql}"); + + for i in 1..=iterations { + let start = Instant::now(); + let results = ctx.sql(sql).await?.collect().await?; + let elapsed = start.elapsed(); + let ms = elapsed.as_secs_f64() * 1000.0; + let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); + println!( + "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" + ); + benchmark_run.write_iter(elapsed, row_count); + } + if self.common.debug { + ctx.sql(sql).await?.explain(false, false)?.show().await?; + } + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + } + + Ok(()) + } + + async fn register_data(&self, ctx: &SessionContext) -> Result<()> { + let csv_options = Default::default(); + let parquet_options = Default::default(); + let path = self.path.as_os_str().to_str().unwrap(); + + if self.path.extension().map(|s| s == "csv").unwrap_or(false) { + ctx.register_csv("x", path, csv_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {path}"), + Box::new(e), + ) + }) + .expect("error registering csv"); + } + + if self + .path + .extension() + .map(|s| s == "parquet") + .unwrap_or(false) + { + ctx.register_parquet("x", path, parquet_options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'table' as {path}"), + Box::new(e), + ) + }) + .expect("error registering parquet"); + } + Ok(()) + } +} + +struct AllQueries { + queries: Vec<String>, +} + +impl AllQueries { + fn try_new(path: &Path) -> Result<Self> { + let all_queries = std::fs::read_to_string(path) + .map_err(|e| exec_datafusion_err!("Could not open {path:?}: {e}"))?; + + Ok(Self { + queries: all_queries.lines().map(|s| s.to_string()).collect(), + }) + } + + /// Returns the text of query `query_id` + fn get_query(&self, query_id: usize) -> Result<&str> { + self.queries + .get(query_id - 1) + .ok_or_else(|| { + let min_id = self.min_query_id(); + let max_id = self.max_query_id(); + exec_datafusion_err!( + "Invalid query id {query_id}. Must be between {min_id} and {max_id}" + ) + }) + .map(|s| s.as_str()) + } + + fn min_query_id(&self) -> usize { + 1 + } + + fn max_query_id(&self) -> usize { + self.queries.len() + } +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index 2d37d78764..858a5b9df7 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -17,6 +17,7 @@ //! DataFusion benchmark runner pub mod clickbench; +pub mod h2o; pub mod imdb; pub mod parquet_filter; pub mod sort; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org