devinjdangelo opened a new issue, #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079

   ### Is your feature request related to a problem or challenge?
   
   Related to #6983. I noticed the same performance issue when writing a single 
large partition/file form a DataFrame. Only a single core is used and it can 
take quite a long time. When there are a small number of large partitions being 
written it would be ideal to leverage multiple cores, especially now that we 
are leveraging multipart  ObjectStore uploads for writes #6987.
   
   
   ### Describe the solution you'd like
   
   This part of the write methods needs to process the RecordBatch stream in 
parallel (perhaps with try_for_each_concurrent):
   
   ```rust
   while let Some(next_batch) = stream.next().await {
                   let batch = next_batch?;
                   writer.write(&batch).await?;
               }
   ```
   
   This could be nontrivial for stateful writers like `AsyncArrowWriter`. It 
also isn't clear to me immediately how the multipart context could be shared 
with concurrent access across threads.
   
   ### Describe alternatives you've considered
   
   You can repartition your DataFrame to more partitions and write out smaller 
files, but sometimes you really do want large files to be written.
   
   ### Additional context
   
   To reproduce this (adapted from @alamb's example in #6983) :
   
   ```
   cd datafusion/benchmarks
   ./bench.sh data tpch10
   ```
   
   ```rust
   use std::{io::Error, time::Instant, sync::Arc};
   use datafusion::prelude::*;
   use chrono;
   use datafusion_common::DataFusionError;
   use object_store::local::LocalFileSystem;
   use url::Url;
   
   const FILENAME: &str = 
"/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
   
   #[tokio::main]
   async fn main() -> Result<(), DataFusionError> {
       let _ctx = SessionContext::new();
       let local = Arc::new(LocalFileSystem::new());
       let local_url = Url::parse("file://local").unwrap();
       _ctx.runtime_env().register_object_store(&local_url, local);
   
       let _read_options = ParquetReadOptions { file_extension: ".parquet", 
table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
       let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();
   
       let start = Instant::now();
       println!("datafusion start -> {:?}", chrono::offset::Local::now());
   
       _df.write_parquet("file://local/home/dev/arrow-datafusion/test_out/", 
None).await?;
       let elapsed = Instant::now() - start;
       println!("datafusion end -> {:?} {elapsed:?}", 
chrono::offset::Local::now());
       Ok(())
   }
   ```
   
   This took 379s on my machine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to