This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a4dd1e2037 Add csv loading benchmarks. (#13544)
a4dd1e2037 is described below

commit a4dd1e203728bf5d43eb96b35a17a322d627cffe
Author: Daniel Hegberg <[email protected]>
AuthorDate: Wed Dec 4 20:13:47 2024 -0800

    Add csv loading benchmarks. (#13544)
    
    * Add csv loading benchmarks.
    
    * Fix fmt.
    
    * Fix clippy.
---
 .gitignore                           |  3 ++
 datafusion/core/Cargo.toml           |  4 ++
 datafusion/core/benches/csv_load.rs  | 81 ++++++++++++++++++++++++++++++++++++
 datafusion/core/src/test_util/csv.rs | 69 ++++++++++++++++++++++++++++++
 datafusion/core/src/test_util/mod.rs |  2 +
 test-utils/src/data_gen.rs           | 32 ++++++++++----
 6 files changed, 184 insertions(+), 7 deletions(-)

diff --git a/.gitignore b/.gitignore
index 8195760513..1fa79249ff 100644
--- a/.gitignore
+++ b/.gitignore
@@ -67,6 +67,9 @@ datafusion/sqllogictest/test_files/scratch*
 # temp file for core
 datafusion/core/*.parquet
 
+# Generated core benchmark data
+datafusion/core/benches/data/*
+
 # rat
 filtered_rat.txt
 rat.txt
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 90b8abc622..48427f7ccd 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -159,6 +159,10 @@ nix = { version = "0.29.0", features = ["fs"] }
 harness = false
 name = "aggregate_query_sql"
 
+[[bench]]
+harness = false
+name = "csv_load"
+
 [[bench]]
 harness = false
 name = "distinct_query_sql"
diff --git a/datafusion/core/benches/csv_load.rs 
b/datafusion/core/benches/csv_load.rs
new file mode 100644
index 0000000000..5f707b31a6
--- /dev/null
+++ b/datafusion/core/benches/csv_load.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate criterion;
+extern crate arrow;
+extern crate datafusion;
+
+mod data_utils;
+use crate::criterion::Criterion;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionContext;
+use datafusion::prelude::CsvReadOptions;
+use datafusion::test_util::csv::TestCsvFile;
+use parking_lot::Mutex;
+use std::sync::Arc;
+use std::time::Duration;
+use test_utils::AccessLogGenerator;
+use tokio::runtime::Runtime;
+
+fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options: 
CsvReadOptions) {
+    let rt = Runtime::new().unwrap();
+    let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
+    criterion::black_box(rt.block_on(df.collect()).unwrap());
+}
+
+fn create_context() -> Result<Arc<Mutex<SessionContext>>> {
+    let ctx = SessionContext::new();
+    Ok(Arc::new(Mutex::new(ctx)))
+}
+
+fn generate_test_file() -> TestCsvFile {
+    let write_location = std::env::current_dir()
+        .unwrap()
+        .join("benches")
+        .join("data");
+
+    // Make sure the write directory exists.
+    std::fs::create_dir_all(&write_location).unwrap();
+    let file_path = write_location.join("logs.csv");
+
+    let generator = AccessLogGenerator::new().with_include_nulls(true);
+    let num_batches = 2;
+    TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as 
usize))
+        .expect("Failed to create test file.")
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let ctx = create_context().unwrap();
+    let test_file = generate_test_file();
+
+    let mut group = c.benchmark_group("load csv testing");
+    group.measurement_time(Duration::from_secs(20));
+
+    group.bench_function("default csv read options", |b| {
+        b.iter(|| {
+            load_csv(
+                ctx.clone(),
+                test_file.path().to_str().unwrap(),
+                CsvReadOptions::default(),
+            )
+        })
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/core/src/test_util/csv.rs 
b/datafusion/core/src/test_util/csv.rs
new file mode 100644
index 0000000000..94c7efb954
--- /dev/null
+++ b/datafusion/core/src/test_util/csv.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Helpers for writing csv files and reading them back
+
+use std::fs::File;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use crate::error::Result;
+
+use arrow::csv::WriterBuilder;
+
+///  a CSV file that has been created for testing.
+pub struct TestCsvFile {
+    path: PathBuf,
+    schema: SchemaRef,
+}
+
+impl TestCsvFile {
+    /// Creates a new csv file at the specified location
+    pub fn try_new(
+        path: PathBuf,
+        batches: impl IntoIterator<Item = RecordBatch>,
+    ) -> Result<Self> {
+        let file = File::create(&path).unwrap();
+        let builder = WriterBuilder::new().with_header(true);
+        let mut writer = builder.build(file);
+
+        let mut batches = batches.into_iter();
+        let first_batch = batches.next().expect("need at least one record 
batch");
+        let schema = first_batch.schema();
+
+        let mut num_rows = 0;
+        for batch in batches {
+            writer.write(&batch)?;
+            num_rows += batch.num_rows();
+        }
+
+        println!("Generated test dataset with {num_rows} rows");
+
+        Ok(Self { path, schema })
+    }
+
+    /// The schema of this csv file
+    pub fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    /// The path to the csv file
+    pub fn path(&self) -> &std::path::Path {
+        self.path.as_path()
+    }
+}
diff --git a/datafusion/core/src/test_util/mod.rs 
b/datafusion/core/src/test_util/mod.rs
index c4c84d667a..09608887c0 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -20,6 +20,8 @@
 #[cfg(feature = "parquet")]
 pub mod parquet;
 
+pub mod csv;
+
 use std::any::Any;
 use std::collections::HashMap;
 use std::fs::File;
diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs
index 45ad51bb44..4227f2d9a7 100644
--- a/test-utils/src/data_gen.rs
+++ b/test-utils/src/data_gen.rs
@@ -33,6 +33,7 @@ struct GeneratorOptions {
     pods_per_host: Range<usize>,
     containers_per_pod: Range<usize>,
     entries_per_container: Range<usize>,
+    include_nulls: bool,
 }
 
 impl Default for GeneratorOptions {
@@ -42,6 +43,7 @@ impl Default for GeneratorOptions {
             pods_per_host: 1..15,
             containers_per_pod: 1..3,
             entries_per_container: 1024..8192,
+            include_nulls: false,
         }
     }
 }
@@ -149,13 +151,23 @@ impl BatchBuilder {
         self.image.append(image).unwrap();
         self.time.append_value(time);
 
-        self.client_addr.append_value(format!(
-            "{}.{}.{}.{}",
-            rng.gen::<u8>(),
-            rng.gen::<u8>(),
-            rng.gen::<u8>(),
-            rng.gen::<u8>()
-        ));
+        if self.options.include_nulls {
+            // Append a null value if the option is set
+            // Use both "NULL" as a string and a null value
+            if rng.gen_bool(0.5) {
+                self.client_addr.append_null();
+            } else {
+                self.client_addr.append_value("NULL");
+            }
+        } else {
+            self.client_addr.append_value(format!(
+                "{}.{}.{}.{}",
+                rng.gen::<u8>(),
+                rng.gen::<u8>(),
+                rng.gen::<u8>(),
+                rng.gen::<u8>()
+            ));
+        }
         self.request_duration.append_value(rng.gen());
         self.request_user_agent
             .append_value(random_string(rng, 20..100));
@@ -317,6 +329,12 @@ impl AccessLogGenerator {
         self.options.entries_per_container = range;
         self
     }
+
+    // Set the condition for null values in the generated data
+    pub fn with_include_nulls(mut self, include_nulls: bool) -> Self {
+        self.options.include_nulls = include_nulls;
+        self
+    }
 }
 
 impl Iterator for AccessLogGenerator {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to