jorgecarleitao commented on a change in pull request #8172:
URL: https://github.com/apache/arrow/pull/8172#discussion_r487305313
##########
File path: rust/datafusion/benches/aggregate_query_sql.rs
##########
@@ -39,72 +46,105 @@ fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
for _batch in results {}
}
-fn create_context() -> ExecutionContext {
- // define schema for data source (csv file)
+fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
+ // use random numbers to avoid spurious compiler optimizations wrt to
branching
+ let mut rng = rand::thread_rng();
+
+ (0..size)
+ .map(|_| {
+ if rng.gen::<f64>() > null_density {
+ None
+ } else {
+ Some(rng.gen::<f64>())
+ }
+ })
+ .collect()
+}
+
+fn create_context(
+ partitions_len: usize,
+ array_len: usize,
+ batch_size: usize,
+) -> Result<ExecutionContext> {
+ // define a schema.
let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Utf8, false),
- Field::new("c2", DataType::UInt32, false),
- Field::new("c3", DataType::Int8, false),
- Field::new("c4", DataType::Int16, false),
- Field::new("c5", DataType::Int32, false),
- Field::new("c6", DataType::Int64, false),
- Field::new("c7", DataType::UInt8, false),
- Field::new("c8", DataType::UInt16, false),
- Field::new("c9", DataType::UInt32, false),
- Field::new("c10", DataType::UInt64, false),
- Field::new("c11", DataType::Float32, false),
- Field::new("c12", DataType::Float64, false),
- Field::new("c13", DataType::Utf8, false),
+ Field::new("utf8", DataType::Utf8, false),
+ Field::new("f32", DataType::Float32, false),
+ Field::new("f64", DataType::Float64, false),
]));
- let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not
defined");
+ // define data.
+ let partitions = (0..partitions_len)
+ .map(|_| {
+ (0..array_len / batch_size / partitions_len)
+ .map(|i| {
+ let keys: Vec<String> = (0..batch_size)
+ .map(
+ // the 4 here is the number of different keys.
+ // a higher number increase sparseness
+ |i| format!("hi{}", i % 4),
+ )
+ .collect();
+ let keys: Vec<&str> = keys.iter().map(|e| &**e).collect();
+
+ let values = create_data(batch_size, 0.5);
+
+ RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(StringArray::from(keys)),
+ Arc::new(Float32Array::from(vec![i as f32;
batch_size])),
+ Arc::new(Float64Array::from(values)),
+ ],
+ )
+ .unwrap()
+ })
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>();
- // create CSV data source
- let csv = CsvFile::try_new(
- &format!("{}/csv/aggregate_test_100.csv", testdata),
- CsvReadOptions::new().schema(&schema),
- )
- .unwrap();
+ let mut ctx = ExecutionContext::new();
- let mem_table = MemTable::load(&csv).unwrap();
+ // declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
+ let provider = MemTable::new(schema, partitions)?;
+ ctx.register_table("t", Box::new(provider));
- // create local execution context
- let mut ctx = ExecutionContext::new();
- ctx.register_table("aggregate_test_100", Box::new(mem_table));
- ctx
+ Ok(ctx)
}
fn criterion_benchmark(c: &mut Criterion) {
- c.bench_function("aggregate_query_no_group_by", |b| {
- let mut ctx = create_context();
+ let partitions_len = 4;
+ let array_len = 32768; // 2^15
+ let batch_size = 2048; // 2^11
+ let mut ctx = create_context(partitions_len, array_len,
batch_size).unwrap();
+
+ c.bench_function("aggregate_query_no_group_by 15 12", |b| {
Review comment:
11, not 12.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]