This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a4dd1e2037 Add csv loading benchmarks. (#13544)
a4dd1e2037 is described below
commit a4dd1e203728bf5d43eb96b35a17a322d627cffe
Author: Daniel Hegberg <[email protected]>
AuthorDate: Wed Dec 4 20:13:47 2024 -0800
Add csv loading benchmarks. (#13544)
* Add csv loading benchmarks.
* Fix fmt.
* Fix clippy.
---
.gitignore | 3 ++
datafusion/core/Cargo.toml | 4 ++
datafusion/core/benches/csv_load.rs | 81 ++++++++++++++++++++++++++++++++++++
datafusion/core/src/test_util/csv.rs | 69 ++++++++++++++++++++++++++++++
datafusion/core/src/test_util/mod.rs | 2 +
test-utils/src/data_gen.rs | 32 ++++++++++----
6 files changed, 184 insertions(+), 7 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8195760513..1fa79249ff 100644
--- a/.gitignore
+++ b/.gitignore
@@ -67,6 +67,9 @@ datafusion/sqllogictest/test_files/scratch*
# temp file for core
datafusion/core/*.parquet
+# Generated core benchmark data
+datafusion/core/benches/data/*
+
# rat
filtered_rat.txt
rat.txt
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 90b8abc622..48427f7ccd 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -159,6 +159,10 @@ nix = { version = "0.29.0", features = ["fs"] }
harness = false
name = "aggregate_query_sql"
+[[bench]]
+harness = false
+name = "csv_load"
+
[[bench]]
harness = false
name = "distinct_query_sql"
diff --git a/datafusion/core/benches/csv_load.rs
b/datafusion/core/benches/csv_load.rs
new file mode 100644
index 0000000000..5f707b31a6
--- /dev/null
+++ b/datafusion/core/benches/csv_load.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_use]
+extern crate criterion;
+extern crate arrow;
+extern crate datafusion;
+
+mod data_utils;
+use crate::criterion::Criterion;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionContext;
+use datafusion::prelude::CsvReadOptions;
+use datafusion::test_util::csv::TestCsvFile;
+use parking_lot::Mutex;
+use std::sync::Arc;
+use std::time::Duration;
+use test_utils::AccessLogGenerator;
+use tokio::runtime::Runtime;
+
+fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options:
CsvReadOptions) {
+ let rt = Runtime::new().unwrap();
+ let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
+ criterion::black_box(rt.block_on(df.collect()).unwrap());
+}
+
+fn create_context() -> Result<Arc<Mutex<SessionContext>>> {
+ let ctx = SessionContext::new();
+ Ok(Arc::new(Mutex::new(ctx)))
+}
+
+fn generate_test_file() -> TestCsvFile {
+ let write_location = std::env::current_dir()
+ .unwrap()
+ .join("benches")
+ .join("data");
+
+ // Make sure the write directory exists.
+ std::fs::create_dir_all(&write_location).unwrap();
+ let file_path = write_location.join("logs.csv");
+
+ let generator = AccessLogGenerator::new().with_include_nulls(true);
+ let num_batches = 2;
+ TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as
usize))
+ .expect("Failed to create test file.")
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let ctx = create_context().unwrap();
+ let test_file = generate_test_file();
+
+ let mut group = c.benchmark_group("load csv testing");
+ group.measurement_time(Duration::from_secs(20));
+
+ group.bench_function("default csv read options", |b| {
+ b.iter(|| {
+ load_csv(
+ ctx.clone(),
+ test_file.path().to_str().unwrap(),
+ CsvReadOptions::default(),
+ )
+ })
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/core/src/test_util/csv.rs
b/datafusion/core/src/test_util/csv.rs
new file mode 100644
index 0000000000..94c7efb954
--- /dev/null
+++ b/datafusion/core/src/test_util/csv.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Helpers for writing csv files and reading them back
+
+use std::fs::File;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use crate::error::Result;
+
+use arrow::csv::WriterBuilder;
+
+/// a CSV file that has been created for testing.
+pub struct TestCsvFile {
+ path: PathBuf,
+ schema: SchemaRef,
+}
+
+impl TestCsvFile {
+ /// Creates a new csv file at the specified location
+ pub fn try_new(
+ path: PathBuf,
+ batches: impl IntoIterator<Item = RecordBatch>,
+ ) -> Result<Self> {
+ let file = File::create(&path).unwrap();
+ let builder = WriterBuilder::new().with_header(true);
+ let mut writer = builder.build(file);
+
+ let mut batches = batches.into_iter();
+ let first_batch = batches.next().expect("need at least one record
batch");
+ let schema = first_batch.schema();
+
+ let mut num_rows = 0;
+ for batch in batches {
+ writer.write(&batch)?;
+ num_rows += batch.num_rows();
+ }
+
+ println!("Generated test dataset with {num_rows} rows");
+
+ Ok(Self { path, schema })
+ }
+
+ /// The schema of this csv file
+ pub fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ /// The path to the csv file
+ pub fn path(&self) -> &std::path::Path {
+ self.path.as_path()
+ }
+}
diff --git a/datafusion/core/src/test_util/mod.rs
b/datafusion/core/src/test_util/mod.rs
index c4c84d667a..09608887c0 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -20,6 +20,8 @@
#[cfg(feature = "parquet")]
pub mod parquet;
+pub mod csv;
+
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs
index 45ad51bb44..4227f2d9a7 100644
--- a/test-utils/src/data_gen.rs
+++ b/test-utils/src/data_gen.rs
@@ -33,6 +33,7 @@ struct GeneratorOptions {
pods_per_host: Range<usize>,
containers_per_pod: Range<usize>,
entries_per_container: Range<usize>,
+ include_nulls: bool,
}
impl Default for GeneratorOptions {
@@ -42,6 +43,7 @@ impl Default for GeneratorOptions {
pods_per_host: 1..15,
containers_per_pod: 1..3,
entries_per_container: 1024..8192,
+ include_nulls: false,
}
}
}
@@ -149,13 +151,23 @@ impl BatchBuilder {
self.image.append(image).unwrap();
self.time.append_value(time);
- self.client_addr.append_value(format!(
- "{}.{}.{}.{}",
- rng.gen::<u8>(),
- rng.gen::<u8>(),
- rng.gen::<u8>(),
- rng.gen::<u8>()
- ));
+ if self.options.include_nulls {
+ // Append a null value if the option is set
+ // Use both "NULL" as a string and a null value
+ if rng.gen_bool(0.5) {
+ self.client_addr.append_null();
+ } else {
+ self.client_addr.append_value("NULL");
+ }
+ } else {
+ self.client_addr.append_value(format!(
+ "{}.{}.{}.{}",
+ rng.gen::<u8>(),
+ rng.gen::<u8>(),
+ rng.gen::<u8>(),
+ rng.gen::<u8>()
+ ));
+ }
self.request_duration.append_value(rng.gen());
self.request_user_agent
.append_value(random_string(rng, 20..100));
@@ -317,6 +329,12 @@ impl AccessLogGenerator {
self.options.entries_per_container = range;
self
}
+
+ // Set the condition for null values in the generated data
+ pub fn with_include_nulls(mut self, include_nulls: bool) -> Self {
+ self.options.include_nulls = include_nulls;
+ self
+ }
}
impl Iterator for AccessLogGenerator {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]