This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fb39d5d9f Add benchmarks for testing row filtering (#3769)
fb39d5d9f is described below

commit fb39d5d9fcab5cdab69e35505b877048a73a2e2e
Author: Dan Harris <[email protected]>
AuthorDate: Wed Oct 12 06:44:55 2022 -0400

    Add benchmarks for testing row filtering (#3769)
    
    * Add benchmarks for testing row filtering
    
    * Rework to test everything at once
    
    * Make sure we get the same results with pushdown disabled
    
    * Add license header
    
    * Update benchmarks/README.md
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * PR comments
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 benchmarks/Cargo.toml                         |   3 +
 benchmarks/README.md                          |  34 ++
 benchmarks/src/bin/parquet_filter_pushdown.rs | 487 ++++++++++++++++++++++++++
 3 files changed, 524 insertions(+)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 7367e9682..c07344508 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -32,11 +32,14 @@ simd = ["datafusion/simd"]
 snmalloc = ["snmalloc-rs"]
 
 [dependencies]
+arrow = "24.0.0"
 datafusion = { path = "../datafusion/core" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
+object_store = "0.5.0"
+parquet = "24.0.0"
 rand = "0.8.4"
 serde = { version = "1.0.136", features = ["derive"] }
 serde_json = "1.0.78"
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 505469fc5..97a0bd4c6 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -126,3 +126,37 @@ h2o groupby query 1 took 1669 ms
 
 [1]: http://www.tpc.org/tpch/
 [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
+
+## Parquet filter pushdown benchmarks
+
+This is a set of benchmarks for testing and verifying performance of parquet 
filter pushdown. The queries are executed on
+a synthetic dataset generated during the benchmark execution and designed to 
simulate web server access logs. 
+
+```base
+cargo run --release --bin parquet_filter_pushdown --  --path ./data 
--scale-factor 1.0
+```
+
+This will generate the synthetic dataset at `./data/logs.parquet`. The size of 
the dataset can be controlled through the `size_factor`
+(with the default value of `1.0` generating a ~1GB parquet file). 
+
+For each filter we will run the query using different `ParquetScanOption` 
settings. 
+
+Example run:
+```
+Running benchmarks with the following options: Opt { debug: false, iterations: 
3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 }
+Generated test dataset with 10699521 rows
+Executing with filter 'request_method = Utf8("GET")'
+Using scan options ParquetScanOptions { pushdown_filters: false, 
reorder_predicates: false, enable_page_index: false }
+Iteration 0 returned 10699521 rows in 1303 ms
+Iteration 1 returned 10699521 rows in 1288 ms
+Iteration 2 returned 10699521 rows in 1266 ms
+Using scan options ParquetScanOptions { pushdown_filters: true, 
reorder_predicates: true, enable_page_index: true }
+Iteration 0 returned 1781686 rows in 1970 ms
+Iteration 1 returned 1781686 rows in 2002 ms
+Iteration 2 returned 1781686 rows in 1988 ms
+Using scan options ParquetScanOptions { pushdown_filters: true, 
reorder_predicates: false, enable_page_index: true }
+Iteration 0 returned 1781686 rows in 1940 ms
+Iteration 1 returned 1781686 rows in 1986 ms
+Iteration 2 returned 1781686 rows in 1947 ms
+...
+```
\ No newline at end of file
diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs 
b/benchmarks/src/bin/parquet_filter_pushdown.rs
new file mode 100644
index 000000000..e3b365d4f
--- /dev/null
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -0,0 +1,487 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+    Int32Builder, StringBuilder, StringDictionaryBuilder, 
TimestampNanosecondBuilder,
+    UInt16Builder,
+};
+use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, 
TimeUnit};
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty;
+use datafusion::common::Result;
+use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::execution::context::ExecutionProps;
+use datafusion::logical_expr::{lit, or, Expr};
+use datafusion::logical_plan::ToDFSchema;
+use datafusion::physical_expr::create_physical_expr;
+use datafusion::physical_plan::collect;
+use datafusion::physical_plan::file_format::{
+    FileScanConfig, ParquetExec, ParquetScanOptions,
+};
+use datafusion::physical_plan::filter::FilterExec;
+use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use std::fs::File;
+use std::ops::Range;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Instant;
+use structopt::StructOpt;
+
+#[cfg(feature = "snmalloc")]
+#[global_allocator]
+static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+
+#[derive(Debug, StructOpt)]
+#[structopt(name = "Benchmarks", about = "Apache Arrow Rust Benchmarks.")]
+struct Opt {
+    /// Activate debug mode to see query results
+    #[structopt(short, long)]
+    debug: bool,
+
+    /// Number of iterations of each test run
+    #[structopt(short = "i", long = "iterations", default_value = "3")]
+    iterations: usize,
+
+    /// Number of partitions to process in parallel
+    #[structopt(long = "partitions", default_value = "2")]
+    partitions: usize,
+
+    /// Path to folder where access log file will be generated
+    #[structopt(parse(from_os_str), required = true, short = "p", long = 
"path")]
+    path: PathBuf,
+
+    /// Data page size of the generated parquet file
+    #[structopt(long = "page-size")]
+    page_size: Option<usize>,
+
+    /// Data page size of the generated parquet file
+    #[structopt(long = "row-group-size")]
+    row_group_size: Option<usize>,
+
+    /// Total size of generated dataset. The default scale factor of 1.0 will 
generate a roughly 1GB parquet file
+    #[structopt(short = "s", long = "scale-factor", default_value = "1.0")]
+    scale_factor: f32,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let opt: Opt = Opt::from_args();
+    println!("Running benchmarks with the following options: {:?}", opt);
+
+    let config = SessionConfig::new().with_target_partitions(opt.partitions);
+    let mut ctx = SessionContext::with_config(config);
+
+    let path = opt.path.join("logs.parquet");
+
+    let (object_store_url, object_meta) =
+        gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
+
+    run_benchmarks(
+        &mut ctx,
+        object_store_url.clone(),
+        object_meta.clone(),
+        opt.iterations,
+        opt.debug,
+    )
+    .await?;
+
+    Ok(())
+}
+
+async fn run_benchmarks(
+    ctx: &mut SessionContext,
+    object_store_url: ObjectStoreUrl,
+    object_meta: ObjectMeta,
+    iterations: usize,
+    debug: bool,
+) -> Result<()> {
+    let scan_options_matrix = vec![
+        ParquetScanOptions::default(),
+        ParquetScanOptions::default()
+            .with_page_index(true)
+            .with_pushdown_filters(true)
+            .with_reorder_predicates(true),
+        ParquetScanOptions::default()
+            .with_page_index(true)
+            .with_pushdown_filters(true)
+            .with_reorder_predicates(false),
+    ];
+
+    let filter_matrix = vec![
+        // Selective-ish filter
+        col("request_method").eq(lit("GET")),
+        // Non-selective filter
+        col("request_method").not_eq(lit("GET")),
+        // Basic conjunction
+        col("request_method")
+            .eq(lit("POST"))
+            .and(col("response_status").eq(lit(503_u16))),
+        // Nested filters
+        col("request_method").eq(lit("POST")).and(or(
+            col("response_status").eq(lit(503_u16)),
+            col("response_status").eq(lit(403_u16)),
+        )),
+        // Many filters
+        combine_filters(&[
+            col("request_method").not_eq(lit("GET")),
+            col("response_status").eq(lit(400_u16)),
+            // TODO this fails in the FilterExec with Error: Internal("The 
type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
+            // col("service").eq(lit("backend")),
+        ])
+        .unwrap(),
+        // Filter everything
+        col("response_status").eq(lit(429_u16)),
+        // Filter nothing
+        col("response_status").gt(lit(0_u16)),
+    ];
+
+    for filter_expr in &filter_matrix {
+        println!("Executing with filter '{}'", filter_expr);
+        for scan_options in &scan_options_matrix {
+            println!("Using scan options {:?}", scan_options);
+            for i in 0..iterations {
+                let start = Instant::now();
+                let rows = exec_scan(
+                    ctx,
+                    object_store_url.clone(),
+                    object_meta.clone(),
+                    filter_expr.clone(),
+                    scan_options.clone(),
+                    debug,
+                )
+                .await?;
+                println!(
+                    "Iteration {} returned {} rows in {} ms",
+                    i,
+                    rows,
+                    start.elapsed().as_millis()
+                );
+            }
+        }
+        println!("\n");
+    }
+    Ok(())
+}
+
+async fn exec_scan(
+    ctx: &SessionContext,
+    object_store_url: ObjectStoreUrl,
+    object_meta: ObjectMeta,
+    filter: Expr,
+    scan_options: ParquetScanOptions,
+    debug: bool,
+) -> Result<usize> {
+    let schema = BatchBuilder::schema();
+    let scan_config = FileScanConfig {
+        object_store_url,
+        file_schema: schema.clone(),
+        file_groups: vec![vec![PartitionedFile {
+            object_meta,
+            partition_values: vec![],
+            range: None,
+            extensions: None,
+        }]],
+        statistics: Default::default(),
+        projection: None,
+        limit: None,
+        table_partition_cols: vec![],
+    };
+
+    let df_schema = schema.clone().to_dfschema()?;
+
+    let physical_filter_expr = create_physical_expr(
+        &filter,
+        &df_schema,
+        schema.as_ref(),
+        &ExecutionProps::default(),
+    )?;
+
+    let parquet_exec = Arc::new(
+        ParquetExec::new(scan_config, Some(filter), 
None).with_scan_options(scan_options),
+    );
+
+    let exec = Arc::new(FilterExec::try_new(physical_filter_expr, 
parquet_exec)?);
+
+    let task_ctx = ctx.task_ctx();
+    let result = collect(exec, task_ctx).await?;
+
+    if debug {
+        pretty::print_batches(&result)?;
+    }
+    Ok(result.iter().map(|b| b.num_rows()).sum())
+}
+
+fn gen_data(
+    path: PathBuf,
+    scale_factor: f32,
+    page_size: Option<usize>,
+    row_group_size: Option<usize>,
+) -> Result<(ObjectStoreUrl, ObjectMeta)> {
+    let generator = Generator::new();
+
+    let file = File::create(&path).unwrap();
+
+    let mut props_builder = WriterProperties::builder();
+
+    if let Some(s) = page_size {
+        props_builder = props_builder
+            .set_data_pagesize_limit(s)
+            .set_write_batch_size(s);
+    }
+
+    if let Some(s) = row_group_size {
+        props_builder = props_builder.set_max_row_group_size(s);
+    }
+
+    let mut writer =
+        ArrowWriter::try_new(file, generator.schema.clone(), 
Some(props_builder.build()))
+            .unwrap();
+
+    let mut num_rows = 0;
+
+    let num_batches = 100_f32 * scale_factor;
+
+    for batch in generator.take(num_batches as usize) {
+        writer.write(&batch).unwrap();
+        writer.flush()?;
+        num_rows += batch.num_rows();
+    }
+    writer.close().unwrap();
+
+    println!("Generated test dataset with {} rows", num_rows);
+
+    let size = std::fs::metadata(&path)?.len() as usize;
+
+    let canonical_path = path.canonicalize()?;
+
+    let object_store_url =
+        ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
+            .object_store();
+
+    let object_meta = ObjectMeta {
+        location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
+        last_modified: Default::default(),
+        size,
+    };
+
+    Ok((object_store_url, object_meta))
+}
+
+#[derive(Default)]
+struct BatchBuilder {
+    service: StringDictionaryBuilder<Int32Type>,
+    host: StringDictionaryBuilder<Int32Type>,
+    pod: StringDictionaryBuilder<Int32Type>,
+    container: StringDictionaryBuilder<Int32Type>,
+    image: StringDictionaryBuilder<Int32Type>,
+    time: TimestampNanosecondBuilder,
+    client_addr: StringBuilder,
+    request_duration: Int32Builder,
+    request_user_agent: StringBuilder,
+    request_method: StringBuilder,
+    request_host: StringBuilder,
+    request_bytes: Int32Builder,
+    response_bytes: Int32Builder,
+    response_status: UInt16Builder,
+}
+
+impl BatchBuilder {
+    fn schema() -> SchemaRef {
+        let utf8_dict =
+            || DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+
+        Arc::new(Schema::new(vec![
+            Field::new("service", utf8_dict(), true),
+            Field::new("host", utf8_dict(), false),
+            Field::new("pod", utf8_dict(), false),
+            Field::new("container", utf8_dict(), false),
+            Field::new("image", utf8_dict(), false),
+            Field::new(
+                "time",
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                false,
+            ),
+            Field::new("client_addr", DataType::Utf8, true),
+            Field::new("request_duration_ns", DataType::Int32, false),
+            Field::new("request_user_agent", DataType::Utf8, true),
+            Field::new("request_method", DataType::Utf8, true),
+            Field::new("request_host", DataType::Utf8, true),
+            Field::new("request_bytes", DataType::Int32, true),
+            Field::new("response_bytes", DataType::Int32, true),
+            Field::new("response_status", DataType::UInt16, false),
+        ]))
+    }
+
+    fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) {
+        let num_pods = rng.gen_range(1..15);
+        let pods = generate_sorted_strings(rng, num_pods, 30..40);
+        for pod in pods {
+            for container_idx in 0..rng.gen_range(1..3) {
+                let container = format!("{}_container_{}", service, 
container_idx);
+                let image = format!(
+                    
"{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9",
+                    container
+                );
+
+                let num_entries = rng.gen_range(1024..8192);
+                for i in 0..num_entries {
+                    let time = i as i64 * 1024;
+                    self.append_row(rng, host, &pod, service, &container, 
&image, time);
+                }
+            }
+        }
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    fn append_row(
+        &mut self,
+        rng: &mut StdRng,
+        host: &str,
+        pod: &str,
+        service: &str,
+        container: &str,
+        image: &str,
+        time: i64,
+    ) {
+        let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"];
+        let status = &[200, 204, 400, 503, 403];
+
+        self.service.append(service).unwrap();
+        self.host.append(host).unwrap();
+        self.pod.append(pod).unwrap();
+        self.container.append(container).unwrap();
+        self.image.append(image).unwrap();
+        self.time.append_value(time);
+
+        self.client_addr.append_value(format!(
+            "{}.{}.{}.{}",
+            rng.gen::<u8>(),
+            rng.gen::<u8>(),
+            rng.gen::<u8>(),
+            rng.gen::<u8>()
+        ));
+        self.request_duration.append_value(rng.gen());
+        self.request_user_agent
+            .append_value(random_string(rng, 20..100));
+        self.request_method
+            .append_value(methods[rng.gen_range(0..methods.len())]);
+        self.request_host
+            .append_value(format!("https://{}.mydomain.com";, service));
+
+        self.request_bytes
+            .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
+        self.response_bytes
+            .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
+        self.response_status
+            .append_value(status[rng.gen_range(0..status.len())]);
+    }
+
+    fn finish(mut self, schema: SchemaRef) -> RecordBatch {
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(self.service.finish()),
+                Arc::new(self.host.finish()),
+                Arc::new(self.pod.finish()),
+                Arc::new(self.container.finish()),
+                Arc::new(self.image.finish()),
+                Arc::new(self.time.finish()),
+                Arc::new(self.client_addr.finish()),
+                Arc::new(self.request_duration.finish()),
+                Arc::new(self.request_user_agent.finish()),
+                Arc::new(self.request_method.finish()),
+                Arc::new(self.request_host.finish()),
+                Arc::new(self.request_bytes.finish()),
+                Arc::new(self.response_bytes.finish()),
+                Arc::new(self.response_status.finish()),
+            ],
+        )
+        .unwrap()
+    }
+}
+
+fn random_string(rng: &mut StdRng, len_range: Range<usize>) -> String {
+    let len = rng.gen_range(len_range);
+    (0..len)
+        .map(|_| rng.gen_range(b'a'..=b'z') as char)
+        .collect::<String>()
+}
+
+fn generate_sorted_strings(
+    rng: &mut StdRng,
+    count: usize,
+    str_len: Range<usize>,
+) -> Vec<String> {
+    let mut strings: Vec<_> = (0..count)
+        .map(|_| random_string(rng, str_len.clone()))
+        .collect();
+
+    strings.sort_unstable();
+    strings
+}
+
+/// Generates sorted RecordBatch with an access log style schema for a single 
host
+#[derive(Debug)]
+struct Generator {
+    schema: SchemaRef,
+    rng: StdRng,
+    host_idx: usize,
+}
+
+impl Generator {
+    fn new() -> Self {
+        let seed = [
+            1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 
6, 0, 0,
+            0, 0, 0, 5, 0, 0, 0, 0, 0,
+        ];
+
+        Self {
+            schema: BatchBuilder::schema(),
+            host_idx: 0,
+            rng: StdRng::from_seed(seed),
+        }
+    }
+}
+
+impl Iterator for Generator {
+    type Item = RecordBatch;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut builder = BatchBuilder::default();
+
+        let host = format!(
+            "i-{:016x}.ec2.internal",
+            self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928
+        );
+        self.host_idx += 1;
+
+        for service in &["frontend", "backend", "database", "cache"] {
+            if self.rng.gen_bool(0.5) {
+                continue;
+            }
+            builder.append(&mut self.rng, &host, service);
+        }
+        Some(builder.finish(Arc::clone(&self.schema)))
+    }
+}

Reply via email to