metesynnada commented on PR #8802:
URL:
https://github.com/apache/arrow-datafusion/pull/8802#issuecomment-1884895731
A direct comparison was conducted between the `main` and the
`upstream/spawn-blocking-for-se `branches. The scenario involved reading data
from a file and writing the results to another file. Initially, the approach
was to read from `MemoryExec` and write to a file. However, no significant
differences were observed between the two branches. Therefore, I decided not to
present those results.
## Data generation
TPCH data is used with scale 0.1.
## Benchmark Code
The benchmark code
```rust
use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use criterion::{BatchSize, BenchmarkId};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaBuilder,
SchemaRef};
use datafusion::catalog::TableReference;
use datafusion::common::Result;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use std::sync::Arc;
use tokio::runtime::Builder;
fn register_csv(
ctx: &SessionContext,
table_name: &str,
schema: SchemaRef,
table_path: impl AsRef<str>,
) -> Result<()> {
let file_format = CsvFormat::default()
.with_has_header(false)
.with_delimiter(b'|');
let options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".tbl")
.with_target_partitions(ctx.copied_config().batch_size());
let table_path = ListingTableUrl::parse(table_path)?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(schema);
let table = ListingTable::try_new(config)?;
ctx.register_table(
TableReference::Bare {
table: table_name.into(),
},
Arc::new(table),
)?;
Ok(())
}
async fn execute_query(ctx: SessionContext, sql: String) {
ctx.sql(&sql).await.unwrap().collect().await.unwrap();
}
fn get_tpch_table_schema(table: &str) -> Schema {
match table {
"orders" => Schema::new(vec![
Field::new("o_orderkey", DataType::Int64, false),
Field::new("o_custkey", DataType::Int64, false),
Field::new("o_orderstatus", DataType::Utf8, false),
Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
Field::new("o_orderdate", DataType::Date32, false),
Field::new("o_orderpriority", DataType::Utf8, false),
Field::new("o_clerk", DataType::Utf8, false),
Field::new("o_shippriority", DataType::Int32, false),
Field::new("o_comment", DataType::Utf8, false),
]),
"lineitem" => Schema::new(vec![
Field::new("l_orderkey", DataType::Int64, false),
Field::new("l_partkey", DataType::Int64, false),
Field::new("l_suppkey", DataType::Int64, false),
Field::new("l_linenumber", DataType::Int32, false),
Field::new("l_quantity", DataType::Decimal128(15, 2), false),
Field::new("l_extendedprice", DataType::Decimal128(15, 2),
false),
Field::new("l_discount", DataType::Decimal128(15, 2), false),
Field::new("l_tax", DataType::Decimal128(15, 2), false),
Field::new("l_returnflag", DataType::Utf8, false),
Field::new("l_linestatus", DataType::Utf8, false),
Field::new("l_shipdate", DataType::Date32, false),
Field::new("l_commitdate", DataType::Date32, false),
Field::new("l_receiptdate", DataType::Date32, false),
Field::new("l_shipinstruct", DataType::Utf8, false),
Field::new("l_shipmode", DataType::Utf8, false),
Field::new("l_comment", DataType::Utf8, false),
]),
_ => unimplemented!("Table: {}", table),
}
}
pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
let mut schema =
SchemaBuilder::from(get_tpch_table_schema(table).fields);
schema.push(Field::new("__placeholder", DataType::Utf8, true));
schema.finish()
}
fn delete_file(path: &str) {
std::fs::remove_file(path).expect("Failed to delete file");
}
fn from_elem_reading(c: &mut Criterion) {
let batch_sizes = [100, 1000, 3000];
let target_partitions = [1, 4];
let worker_threads = [1, 4];
let mut group = c.benchmark_group("parameter group");
for batch_size in batch_sizes.iter() {
for target_partition in target_partitions {
for worker_thread in worker_threads {
group.bench_with_input(BenchmarkId::new(
format!("sink_bs{}_tp{}", batch_size, target_partition),
worker_thread,
), &(batch_size, target_partition, worker_thread), |b,
&(batch_size, target_partition, worker_thread)| {
let rt = Builder::new_multi_thread()
.worker_threads(worker_thread)
.build()
.unwrap();
b.to_async(rt).iter_batched(
|| {
let csv_file = tempfile::Builder::new()
.prefix("foo")
.suffix(".csv")
.tempfile()
.unwrap();
let path =
csv_file.path().to_str().unwrap().to_string();
let config = SessionConfig::new()
.with_coalesce_batches(false)
.with_batch_size(*batch_size)
.with_target_partitions(target_partition);
let ctx =
SessionContext::new_with_config(config);
let lineitem =
Arc::new(get_tbl_tpch_table_schema("lineitem"));
register_csv(&ctx, "lineitem", lineitem,
"/path/to/tpch_sf0.1/lineitem.tbl").unwrap();
let orders =
Arc::new(get_tbl_tpch_table_schema("orders"));
register_csv(&ctx, "orders", orders,
"/path/to/tpch_sf0.1/orders.tbl").unwrap();
let sql = format!(
"COPY (SELECT * FROM
lineitem, orders
where
l_orderkey = o_orderkey)
to '{path}' (format csv);"
);
(path, ctx, sql)
},
|(path, clone_ctx, sql)| async move {
execute_query(clone_ctx, sql).await;
delete_file(&path);
},
BatchSize::LargeInput,
);
});
}
}
}
group.finish();
}
criterion_group!(benches, from_elem_reading);
criterion_main!(benches);
```
## Results
| Batch Size | Target Partitions | Worker Thread | main |
upstream/spawn-blocking-for-se | Change |
| --- | --- | --- | --- | --- | --- |
| 100 | 1 | 1 | [1.3831 s 1.3996 s 1.4189 s] | [592.50 ms 596.27 ms 600.22
ms] | -57.396% |
| 100 | 1 | 4 | [685.99 ms 699.18 ms 718.43 ms] | [598.63 ms 603.06 ms
607.75 ms] | -13.748% |
| 100 | 4 | 1 | [1.7561 s 1.7670 s 1.7815 s] | [1.0961 s 1.1026 s 1.1101 s]
| -37.600% |
| 100 | 4 | 4 | [608.23 ms 610.23 ms 612.30 ms] | [554.09 ms 556.52 ms
559.10 ms] | -8.8015% |
| 1000 | 1 | 1 | [1.2395 s 1.2435 s 1.2476 s] | [454.51 ms 458.97 ms 463.63
ms] | -63.090% |
| 1000 | 1 | 4 | [527.75 ms 529.60 ms 531.56 ms] | [430.79 ms 433.86 ms
437.25 ms] | -18.077% |
| 1000 | 4 | 1 | [1.3800 s 1.3892 s 1.3984 s] | [533.50 ms 536.24 ms 539.02
ms] | -61.400% |
| 1000 | 4 | 4 | [372.48 ms 374.06 ms 375.66 ms | [299.15 ms 301.95 ms
305.48 ms] | -19.278% |
| 3000 | 1 | 1 | [1.2785 s 1.2860 s 1.2934 s] | [464.70 ms 473.55 ms 482.60
ms] | -63.178% |
| 3000 | 1 | 4 | [518.10 ms 519.63 ms 521.23 ms] | [417.72 ms 419.75 ms
422.10 ms] | -19.221% |
| 3000 | 4 | 1 | [1.3351 s 1.3410 s 1.3471 s] | [498.30 ms 501.91 ms 505.76
ms] | -62.573% |
| 3000 | 4 | 4 | [351.96 ms 353.56 ms 355.25 ms] | [274.52 ms 277.74 ms
281.95 ms] | -21.443% |
I try my best to make an objective benchmark, but I might miss something.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]