This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 21fa511543 Benchmark query cancellation (#14818)
21fa511543 is described below

commit 21fa51154376c9872619b428adcc6e9e199556e8
Author: Carol (Nichols || Goulding) 
<[email protected]>
AuthorDate: Wed Feb 26 12:34:56 2025 -0500

    Benchmark query cancellation (#14818)
    
    Connects to #14036.
    
    This benchmark loads multiple files into an in-memory object store,
    starts a datafusion query in a new tokio runtime, lets the query run for
    an amount of time, cancels the query, and measures how long it takes to
    drop the tokio runtime.
    
    This demonstrates datafusion is likely not yielding often enough to
    allow for timely query cancellation and freeing up of all resources.
---
 Cargo.lock                     |   3 +
 benchmarks/Cargo.toml          |   3 +
 benchmarks/bench.sh            |  29 +++-
 benchmarks/src/bin/dfbench.rs  |  20 +--
 benchmarks/src/cancellation.rs | 329 +++++++++++++++++++++++++++++++++++++++++
 benchmarks/src/lib.rs          |   1 +
 6 files changed, 368 insertions(+), 17 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 4b4436c191..aca2c42866 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1793,13 +1793,16 @@ dependencies = [
  "futures",
  "log",
  "mimalloc",
+ "object_store",
  "parquet",
+ "rand 0.8.5",
  "serde",
  "serde_json",
  "snmalloc-rs",
  "structopt",
  "test-utils",
  "tokio",
+ "tokio-util",
 ]
 
 [[package]]
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 860089063c..18478de6c8 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -42,13 +42,16 @@ env_logger = { workspace = true }
 futures = { workspace = true }
 log = { workspace = true }
 mimalloc = { version = "0.1", optional = true, default-features = false }
+object_store = { workspace = true }
 parquet = { workspace = true, default-features = true }
+rand = { workspace = true }
 serde = { version = "1.0.218", features = ["derive"] }
 serde_json = { workspace = true }
 snmalloc-rs = { version = "0.3", optional = true }
 structopt = { version = "0.3", default-features = false }
 test-utils = { path = "../test-utils/", version = "0.1.0" }
 tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
+tokio-util = { version = "0.7.4" }
 
 [dev-dependencies]
 datafusion-proto = { workspace = true }
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index 96c90aa1f6..1e56c96479 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -73,6 +73,7 @@ tpch:                   TPCH inspired benchmark on Scale 
Factor (SF) 1 (~1GB), s
 tpch_mem:               TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), 
query from memory
 tpch10:                 TPCH inspired benchmark on Scale Factor (SF) 10 
(~10GB), single parquet file per table, hash join
 tpch_mem10:             TPCH inspired benchmark on Scale Factor (SF) 10 
(~10GB), query from memory
+cancellation:           How long cancelling a query takes
 parquet:                Benchmark of parquet reader's filtering speed
 sort:                   Benchmark of sorting speed
 sort_tpch:              Benchmark of sorting speed for end-to-end sort queries 
on TPCH dataset
@@ -232,6 +233,7 @@ main() {
                     run_tpch_mem "1"
                     run_tpch "10"
                     run_tpch_mem "10"
+                    run_cancellation
                     run_parquet
                     run_sort
                     run_clickbench_1
@@ -255,6 +257,9 @@ main() {
                 tpch_mem10)
                     run_tpch_mem "10"
                     ;;
+                cancellation)
+                    run_cancellation
+                    ;;
                 parquet)
                     run_parquet
                     ;;
@@ -397,6 +402,14 @@ run_tpch_mem() {
     $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path 
"${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o 
"${RESULTS_FILE}"
 }
 
+# Runs the cancellation benchmark
+run_cancellation() {
+    RESULTS_FILE="${RESULTS_DIR}/cancellation.json"
+    echo "RESULTS_FILE: ${RESULTS_FILE}"
+    echo "Running cancellation benchmark..."
+    $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path 
"${DATA_DIR}/cancellation" -o "${RESULTS_FILE}"
+}
+
 # Runs the parquet filter benchmark
 run_parquet() {
     RESULTS_FILE="${RESULTS_DIR}/parquet.json"
@@ -490,9 +503,9 @@ data_imdb() {
     local imdb_temp_gz="${imdb_dir}/imdb.tgz"
     local imdb_url="https://event.cwi.nl/da/job/imdb.tgz";
 
-   # imdb has 21 files, we just separate them into 3 groups for better 
readability 
+   # imdb has 21 files, we just separate them into 3 groups for better 
readability
     local first_required_files=(
-        "aka_name.parquet"    
+        "aka_name.parquet"
         "aka_title.parquet"
         "cast_info.parquet"
         "char_name.parquet"
@@ -539,13 +552,13 @@ data_imdb() {
     if [ "$convert_needed" = true ]; then
         # Expected size of the dataset
         expected_size="1263193115" # 1.18 GB
-        
+
         echo -n "Looking for imdb.tgz... "
         if [ -f "${imdb_temp_gz}" ]; then
             echo "found"
             echo -n "Checking size... "
             OUTPUT_SIZE=$(wc -c "${imdb_temp_gz}" 2>/dev/null | awk '{print 
$1}' || true)
-            
+
             #Checking the size of the existing file
             if [ "${OUTPUT_SIZE}" = "${expected_size}" ]; then
                 # Existing file is of the expected size, no need for download
@@ -559,7 +572,7 @@ data_imdb() {
 
                 # Download the dataset
                 curl -o "${imdb_temp_gz}" "${imdb_url}"
-                
+
                 # Size check of the installed file
                 DOWNLOADED_SIZE=$(wc -c "${imdb_temp_gz}" | awk '{print $1}')
                 if [ "${DOWNLOADED_SIZE}" != "${expected_size}" ]; then
@@ -591,7 +604,7 @@ data_imdb() {
 # Runs the imdb benchmark
 run_imdb() {
     IMDB_DIR="${DATA_DIR}/imdb"
-    
+
     RESULTS_FILE="${RESULTS_DIR}/imdb.json"
     echo "RESULTS_FILE: ${RESULTS_FILE}"
     echo "Running imdb benchmark..."
@@ -726,9 +739,9 @@ run_external_aggr() {
     echo "Running external aggregation benchmark..."
 
     # Only parquet is supported.
-    # Since per-operator memory limit is calculated as (total-memory-limit / 
+    # Since per-operator memory limit is calculated as (total-memory-limit /
     # number-of-partitions), and by default `--partitions` is set to number of
-    # CPU cores, we set a constant number of partitions to prevent this 
+    # CPU cores, we set a constant number of partitions to prevent this
     # benchmark to fail on some machines.
     $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 
--iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
 }
diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
index db6c29f4a4..06337cb758 100644
--- a/benchmarks/src/bin/dfbench.rs
+++ b/benchmarks/src/bin/dfbench.rs
@@ -34,20 +34,21 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
 use datafusion_benchmarks::{
-    clickbench, h2o, imdb, parquet_filter, sort, sort_tpch, tpch,
+    cancellation, clickbench, h2o, imdb, parquet_filter, sort, sort_tpch, tpch,
 };
 
 #[derive(Debug, StructOpt)]
 #[structopt(about = "benchmark command")]
 enum Options {
-    Tpch(tpch::RunOpt),
-    TpchConvert(tpch::ConvertOpt),
+    Cancellation(cancellation::RunOpt),
     Clickbench(clickbench::RunOpt),
+    H2o(h2o::RunOpt),
+    Imdb(imdb::RunOpt),
     ParquetFilter(parquet_filter::RunOpt),
     Sort(sort::RunOpt),
     SortTpch(sort_tpch::RunOpt),
-    Imdb(imdb::RunOpt),
-    H2o(h2o::RunOpt),
+    Tpch(tpch::RunOpt),
+    TpchConvert(tpch::ConvertOpt),
 }
 
 // Main benchmark runner entrypoint
@@ -56,13 +57,14 @@ pub async fn main() -> Result<()> {
     env_logger::init();
 
     match Options::from_args() {
-        Options::Tpch(opt) => opt.run().await,
-        Options::TpchConvert(opt) => opt.run().await,
+        Options::Cancellation(opt) => opt.run().await,
         Options::Clickbench(opt) => opt.run().await,
+        Options::H2o(opt) => opt.run().await,
+        Options::Imdb(opt) => opt.run().await,
         Options::ParquetFilter(opt) => opt.run().await,
         Options::Sort(opt) => opt.run().await,
         Options::SortTpch(opt) => opt.run().await,
-        Options::Imdb(opt) => opt.run().await,
-        Options::H2o(opt) => opt.run().await,
+        Options::Tpch(opt) => opt.run().await,
+        Options::TpchConvert(opt) => opt.run().await,
     }
 }
diff --git a/benchmarks/src/cancellation.rs b/benchmarks/src/cancellation.rs
new file mode 100644
index 0000000000..3c3ca424a3
--- /dev/null
+++ b/benchmarks/src/cancellation.rs
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::util::{BenchmarkRun, CommonOpt};
+
+use arrow::array::Array;
+use arrow::datatypes::DataType;
+use arrow::record_batch::RecordBatch;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::execution::TaskContext;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::*;
+use datafusion_common::instant::Instant;
+use futures::TryStreamExt;
+use object_store::ObjectStore;
+use parquet::arrow::async_writer::ParquetObjectWriter;
+use parquet::arrow::AsyncArrowWriter;
+use rand::distributions::Alphanumeric;
+use rand::rngs::ThreadRng;
+use rand::Rng;
+use structopt::StructOpt;
+use tokio::runtime::Runtime;
+use tokio_util::sync::CancellationToken;
+
+/// Test performance of cancelling queries
+///
+/// The queries are executed on a synthetic dataset generated during
+/// the benchmark execution that is an anonymized version of a
+/// real-world data set.
+///
+/// The query is an anonymized version of a real-world query, and the
+/// test starts the query then cancels it and reports how long it takes
+/// for the runtime to fully exit.
+#[derive(Debug, StructOpt, Clone)]
+#[structopt(verbatim_doc_comment)]
+pub struct RunOpt {
+    /// Common options
+    #[structopt(flatten)]
+    common: CommonOpt,
+
+    /// Path to folder where data will be generated
+    #[structopt(parse(from_os_str), required = true, short = "p", long = 
"path")]
+    path: PathBuf,
+
+    /// Path to machine readable output file
+    #[structopt(parse(from_os_str), short = "o", long = "output")]
+    output_path: Option<PathBuf>,
+
+    /// Number of files to generate
+    #[structopt(long = "num-files", default_value = "7")]
+    num_files: usize,
+
+    /// Number of rows per file to generate
+    #[structopt(long = "num-rows-per-file", default_value = "5000000")]
+    num_rows_per_file: usize,
+
+    /// How long to wait, in milliseconds, before attempting to cancel
+    #[structopt(long = "wait-time", default_value = "100")]
+    wait_time: u64,
+}
+
+impl RunOpt {
+    pub async fn run(self) -> Result<()> {
+        let files_on_disk =
+            find_or_generate_files(&self.path, self.num_files, 
self.num_rows_per_file)
+                .await?;
+
+        // Using an in-memory object store is important for this benchmark to 
ensure `datafusion`
+        // is yielding often enough regardless of whether the file reading 
happens to be yielding.
+        let store =
+            Arc::new(object_store::memory::InMemory::new()) as Arc<dyn 
ObjectStore>;
+        println!("Starting to load data into in-memory object store");
+        load_data(Arc::clone(&store), &files_on_disk).await?;
+        println!("Done loading data into in-memory object store");
+
+        let mut rundata = BenchmarkRun::new();
+        rundata.start_new_case("Arglebargle");
+
+        for i in 0..self.common.iterations {
+            let elapsed = run_test(self.wait_time, Arc::clone(&store))?;
+            let ms = elapsed.as_secs_f64() * 1000.0;
+            println!("Iteration {i} cancelled in {ms} ms");
+            rundata.write_iter(elapsed, 0);
+        }
+
+        rundata.maybe_write_json(self.output_path.as_ref())?;
+
+        Ok(())
+    }
+}
+
+fn run_test(wait_time: u64, store: Arc<dyn ObjectStore>) -> Result<Duration> {
+    std::thread::spawn(move || {
+        let token = CancellationToken::new();
+        let captured_token = token.clone();
+
+        let rt = Runtime::new()?;
+        rt.spawn(async move {
+            println!("Starting spawned");
+            loop {
+                let store = Arc::clone(&store);
+                tokio::select! {
+                    biased;
+                    _ = async move {
+                        datafusion(store).await.unwrap();
+                    } => {
+                        println!("matched case doing work");
+                    },
+                    _ = captured_token.cancelled() => {
+                        println!("Received shutdown request");
+                        return;
+                    },
+                }
+            }
+        });
+
+        println!("in main, sleeping");
+        std::thread::sleep(Duration::from_millis(wait_time));
+
+        let start = Instant::now();
+
+        println!("cancelling thread");
+        token.cancel();
+
+        drop(rt);
+
+        let elapsed = start.elapsed();
+        println!("done dropping runtime in {elapsed:?}");
+
+        Ok(elapsed)
+    })
+    .join()
+    .unwrap()
+}
+
+async fn datafusion(store: Arc<dyn ObjectStore>) -> Result<()> {
+    let query = "SELECT distinct \"A\", \"B\", \"C\", \"D\", \"E\" FROM 
\"test_table\"";
+
+    let config = SessionConfig::new()
+        .with_target_partitions(4)
+        .set_bool("datafusion.execution.parquet.pushdown_filters", true);
+    let ctx = SessionContext::new_with_config(config);
+    let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
+    ctx.register_object_store(object_store_url.as_ref(), Arc::clone(&store));
+
+    let file_format = ParquetFormat::default().with_enable_pruning(true);
+    let listing_options = ListingOptions::new(Arc::new(file_format))
+        .with_file_extension(ParquetFormat::default().get_ext());
+
+    let table_path = ListingTableUrl::parse("test:///data/")?;
+
+    ctx.register_listing_table(
+        "test_table",
+        &table_path,
+        listing_options.clone(),
+        None,
+        None,
+    )
+    .await?;
+
+    println!("Creating logical plan...");
+    let logical_plan = ctx.state().create_logical_plan(query).await?;
+
+    println!("Creating physical plan...");
+    let physical_plan = Arc::new(CoalescePartitionsExec::new(
+        ctx.state().create_physical_plan(&logical_plan).await?,
+    ));
+
+    println!("Executing physical plan...");
+    let partition = 0;
+    let task_context = Arc::new(TaskContext::from(&ctx));
+    let stream = physical_plan.execute(partition, task_context).unwrap();
+
+    println!("Getting results...");
+    let results: Vec<_> = stream.try_collect().await?;
+    println!("Got {} record batches", results.len());
+
+    Ok(())
+}
+
+async fn find_or_generate_files(
+    data_dir: impl AsRef<Path>,
+    num_files: usize,
+    num_rows_per_file: usize,
+) -> Result<Vec<PathBuf>> {
+    // Ignore errors if the directory already exists
+    let _ = std::fs::create_dir_all(data_dir.as_ref());
+    let files_on_disk = find_files_on_disk(data_dir.as_ref())?;
+
+    if files_on_disk.is_empty() {
+        println!("No data files found, generating (this will take a bit)");
+        generate_data(data_dir.as_ref(), num_files, num_rows_per_file).await?;
+        println!("Done generating files");
+        let files_on_disk = find_files_on_disk(data_dir)?;
+
+        if files_on_disk.is_empty() {
+            panic!("Tried to generate data files but there are still no files 
on disk");
+        } else {
+            println!("Using {} files now on disk", files_on_disk.len());
+            Ok(files_on_disk)
+        }
+    } else {
+        println!("Using {} files found on disk", files_on_disk.len());
+        Ok(files_on_disk)
+    }
+}
+
+fn find_files_on_disk(data_dir: impl AsRef<Path>) -> Result<Vec<PathBuf>> {
+    Ok(std::fs::read_dir(&data_dir)?
+        .filter_map(|file| {
+            let path = file.unwrap().path();
+            if path
+                .extension()
+                .map(|ext| (ext == "parquet"))
+                .unwrap_or(false)
+            {
+                Some(path)
+            } else {
+                None
+            }
+        })
+        .collect())
+}
+
+async fn load_data(
+    store: Arc<dyn ObjectStore>,
+    files_on_disk: &[PathBuf],
+) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
+    for file in files_on_disk {
+        let bytes = std::fs::read(file)?;
+
+        let path = object_store::path::Path::from_iter([
+            "data",
+            file.file_name().unwrap().to_str().unwrap(),
+        ]);
+        let payload = object_store::PutPayload::from_bytes(bytes.into());
+        store
+            .put_opts(&path, payload, object_store::PutOptions::default())
+            .await?;
+    }
+
+    Ok(())
+}
+
+async fn generate_data(
+    data_dir: impl AsRef<Path>,
+    num_files: usize,
+    num_rows_per_file: usize,
+) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
+    let absolute = std::env::current_dir().unwrap().join(data_dir);
+    let store = Arc::new(object_store::local::LocalFileSystem::new_with_prefix(
+        absolute,
+    )?);
+
+    let columns = [
+        ("A", DataType::Float64),
+        ("B", DataType::Float64),
+        ("C", DataType::Float64),
+        ("D", DataType::Boolean),
+        ("E", DataType::Utf8),
+        ("F", DataType::Utf8),
+        ("G", DataType::Utf8),
+        ("H", DataType::Utf8),
+        ("I", DataType::Utf8),
+        ("J", DataType::Utf8),
+        ("K", DataType::Utf8),
+    ];
+
+    for file_num in 1..=num_files {
+        println!("Generating file {file_num} of {num_files}");
+        let data = columns.iter().map(|(column_name, column_type)| {
+            let column = random_data(column_type, num_rows_per_file);
+            (column_name, column)
+        });
+        let to_write = RecordBatch::try_from_iter(data).unwrap();
+        let path = 
object_store::path::Path::from(format!("{file_num}.parquet").as_str());
+        let object_store_writer = ParquetObjectWriter::new(Arc::clone(&store) 
as _, path);
+
+        let mut writer =
+            AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), 
None)?;
+        writer.write(&to_write).await?;
+        writer.close().await?;
+    }
+
+    Ok(())
+}
+
+fn random_data(column_type: &DataType, rows: usize) -> Arc<dyn Array> {
+    let mut rng = rand::thread_rng();
+    let values = (0..rows).map(|_| random_value(&mut rng, column_type));
+    ScalarValue::iter_to_array(values).unwrap()
+}
+
+fn random_value(rng: &mut ThreadRng, column_type: &DataType) -> ScalarValue {
+    match column_type {
+        DataType::Float64 => ScalarValue::Float64(Some(rng.gen())),
+        DataType::Boolean => ScalarValue::Boolean(Some(rng.gen())),
+        DataType::Utf8 => ScalarValue::Utf8(Some(
+            rng.sample_iter(&Alphanumeric)
+                .take(10)
+                .map(char::from)
+                .collect(),
+        )),
+        other => unimplemented!("No random value generation implemented for 
{other:?}"),
+    }
+}
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs
index 858a5b9df7..a402fc1b8c 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/lib.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! DataFusion benchmark runner
+pub mod cancellation;
 pub mod clickbench;
 pub mod h2o;
 pub mod imdb;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to