metesynnada commented on PR #8802:
URL: 
https://github.com/apache/arrow-datafusion/pull/8802#issuecomment-1884895731

   A direct comparison was conducted between the `main` and the 
`upstream/spawn-blocking-for-se `branches. The scenario involved reading data 
from a file and writing the results to another file. Initially, the approach 
was to read from `MemoryExec` and write to a file. However, no significant 
differences were observed between the two branches. Therefore, I decided not to 
present those results.
   
   ## Data generation
   
   TPCH data is used with scale 0.1.
   
   ## Benchmark Code
   
   The benchmark code
   
   ```rust
   use criterion::Criterion;
   use criterion::{criterion_group, criterion_main};
   use criterion::{BatchSize, BenchmarkId};
   use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, 
SchemaRef};
   use datafusion::catalog::TableReference;
   use datafusion::common::Result;
   use datafusion::datasource::file_format::csv::CsvFormat;
   use datafusion::datasource::listing::{
       ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
   };
   use datafusion::prelude::{SessionConfig, SessionContext};
   use std::sync::Arc;
   use tokio::runtime::Builder;
   
   fn register_csv(
       ctx: &SessionContext,
       table_name: &str,
       schema: SchemaRef,
       table_path: impl AsRef<str>,
   ) -> Result<()> {
       let file_format = CsvFormat::default()
           .with_has_header(false)
           .with_delimiter(b'|');
   
       let options = ListingOptions::new(Arc::new(file_format))
           .with_file_extension(".tbl")
           .with_target_partitions(ctx.copied_config().batch_size());
       let table_path = ListingTableUrl::parse(table_path)?;
   
       let config = ListingTableConfig::new(table_path)
           .with_listing_options(options)
           .with_schema(schema);
       let table = ListingTable::try_new(config)?;
       ctx.register_table(
           TableReference::Bare {
               table: table_name.into(),
           },
           Arc::new(table),
       )?;
       Ok(())
   }
   
   async fn execute_query(ctx: SessionContext, sql: String) {
       ctx.sql(&sql).await.unwrap().collect().await.unwrap();
   }
   
   fn get_tpch_table_schema(table: &str) -> Schema {
       match table {
           "orders" => Schema::new(vec![
               Field::new("o_orderkey", DataType::Int64, false),
               Field::new("o_custkey", DataType::Int64, false),
               Field::new("o_orderstatus", DataType::Utf8, false),
               Field::new("o_totalprice", DataType::Decimal128(15, 2), false),
               Field::new("o_orderdate", DataType::Date32, false),
               Field::new("o_orderpriority", DataType::Utf8, false),
               Field::new("o_clerk", DataType::Utf8, false),
               Field::new("o_shippriority", DataType::Int32, false),
               Field::new("o_comment", DataType::Utf8, false),
           ]),
   
           "lineitem" => Schema::new(vec![
               Field::new("l_orderkey", DataType::Int64, false),
               Field::new("l_partkey", DataType::Int64, false),
               Field::new("l_suppkey", DataType::Int64, false),
               Field::new("l_linenumber", DataType::Int32, false),
               Field::new("l_quantity", DataType::Decimal128(15, 2), false),
               Field::new("l_extendedprice", DataType::Decimal128(15, 2), 
false),
               Field::new("l_discount", DataType::Decimal128(15, 2), false),
               Field::new("l_tax", DataType::Decimal128(15, 2), false),
               Field::new("l_returnflag", DataType::Utf8, false),
               Field::new("l_linestatus", DataType::Utf8, false),
               Field::new("l_shipdate", DataType::Date32, false),
               Field::new("l_commitdate", DataType::Date32, false),
               Field::new("l_receiptdate", DataType::Date32, false),
               Field::new("l_shipinstruct", DataType::Utf8, false),
               Field::new("l_shipmode", DataType::Utf8, false),
               Field::new("l_comment", DataType::Utf8, false),
           ]),
           _ => unimplemented!("Table: {}", table),
       }
   }
   
   pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
       let mut schema = 
SchemaBuilder::from(get_tpch_table_schema(table).fields);
       schema.push(Field::new("__placeholder", DataType::Utf8, true));
       schema.finish()
   }
   
   fn delete_file(path: &str) {
       std::fs::remove_file(path).expect("Failed to delete file");
   }
   
   fn from_elem_reading(c: &mut Criterion) {
       let batch_sizes = [100, 1000, 3000];
       let target_partitions = [1, 4];
       let worker_threads = [1, 4];
       let mut group = c.benchmark_group("parameter group");
       for batch_size in batch_sizes.iter() {
           for target_partition in target_partitions {
               for worker_thread in worker_threads {
                   group.bench_with_input(BenchmarkId::new(
                       format!("sink_bs{}_tp{}", batch_size, target_partition),
                       worker_thread,
                   ), &(batch_size, target_partition, worker_thread), |b, 
&(batch_size, target_partition, worker_thread)| {
                       let rt = Builder::new_multi_thread()
                           .worker_threads(worker_thread)
                           .build()
                           .unwrap();
                       b.to_async(rt).iter_batched(
                           || {
                               let csv_file = tempfile::Builder::new()
                                   .prefix("foo")
                                   .suffix(".csv")
                                   .tempfile()
                                   .unwrap();
                               let path = 
csv_file.path().to_str().unwrap().to_string();
                               let config = SessionConfig::new()
                                   .with_coalesce_batches(false)
                                   .with_batch_size(*batch_size)
                                   .with_target_partitions(target_partition);
                               let ctx = 
SessionContext::new_with_config(config);
                               let lineitem = 
Arc::new(get_tbl_tpch_table_schema("lineitem"));
                               register_csv(&ctx, "lineitem", lineitem, 
"/path/to/tpch_sf0.1/lineitem.tbl").unwrap();
                               let orders = 
Arc::new(get_tbl_tpch_table_schema("orders"));
                               register_csv(&ctx, "orders", orders, 
"/path/to/tpch_sf0.1/orders.tbl").unwrap();
                               let sql = format!(
                                   "COPY (SELECT * FROM
                                   lineitem, orders
                                   where
                                   l_orderkey = o_orderkey)
                                   to '{path}' (format csv);"
                               );
                               (path, ctx, sql)
                           },
                           |(path, clone_ctx, sql)| async move {
                               execute_query(clone_ctx, sql).await;
                               delete_file(&path);
                           },
                           BatchSize::LargeInput,
                       );
                   });
               }
           }
       }
       group.finish();
   }
   
   criterion_group!(benches, from_elem_reading);
   criterion_main!(benches);
   ```
   
   ## Results
   
   | Batch Size | Target Partitions | Worker Thread | main | 
upstream/spawn-blocking-for-se | Change |
   | --- | --- | --- | --- | --- | --- |
   | 100 | 1 | 1 | [1.3831 s 1.3996 s 1.4189 s] | [592.50 ms 596.27 ms 600.22 
ms] | -57.396% |
   | 100 | 1 | 4 | [685.99 ms 699.18 ms 718.43 ms] | [598.63 ms 603.06 ms 
607.75 ms] | -13.748% |
   | 100 | 4 | 1 | [1.7561 s 1.7670 s 1.7815 s] | [1.0961 s 1.1026 s 1.1101 s] 
| -37.600% |
   | 100 | 4 | 4 | [608.23 ms 610.23 ms 612.30 ms] | [554.09 ms 556.52 ms 
559.10 ms] | -8.8015% |
   | 1000 | 1 | 1 | [1.2395 s 1.2435 s 1.2476 s] | [454.51 ms 458.97 ms 463.63 
ms] | -63.090% |
   | 1000 | 1 | 4 | [527.75 ms 529.60 ms 531.56 ms] | [430.79 ms 433.86 ms 
437.25 ms] | -18.077% |
   | 1000 | 4 | 1 | [1.3800 s 1.3892 s 1.3984 s] | [533.50 ms 536.24 ms 539.02 
ms] | -61.400% |
   | 1000 | 4 | 4 | [372.48 ms 374.06 ms 375.66 ms | [299.15 ms 301.95 ms 
305.48 ms] | -19.278% |
   | 3000 | 1 | 1 | [1.2785 s 1.2860 s 1.2934 s] | [464.70 ms 473.55 ms 482.60 
ms] | -63.178% |
   | 3000 | 1 | 4 | [518.10 ms 519.63 ms 521.23 ms] | [417.72 ms 419.75 ms 
422.10 ms] | -19.221% |
   | 3000 | 4 | 1 | [1.3351 s 1.3410 s 1.3471 s] | [498.30 ms 501.91 ms 505.76 
ms] | -62.573% |
   | 3000 | 4 | 4 | [351.96 ms 353.56 ms 355.25 ms] | [274.52 ms 277.74 ms 
281.95 ms] | -21.443% |
   
   I try my best to make an objective benchmark, but I might miss something. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to