devinjdangelo opened a new issue, #7079:
URL: https://github.com/apache/arrow-datafusion/issues/7079
### Is your feature request related to a problem or challenge?
Related to #6983. I noticed the same performance issue when writing a single
large partition/file form a DataFrame. Only a single core is used and it can
take quite a long time. When there are a small number of large partitions being
written it would be ideal to leverage multiple cores, especially now that we
are leveraging multipart ObjectStore uploads for writes #6987.
### Describe the solution you'd like
This part of the write methods needs to process the RecordBatch stream in
parallel (perhaps with try_for_each_concurrent):
```rust
while let Some(next_batch) = stream.next().await {
let batch = next_batch?;
writer.write(&batch).await?;
}
```
This could be nontrivial for stateful writers like `AsyncArrowWriter`. It
also isn't clear to me immediately how the multipart context could be shared
with concurrent access across threads.
### Describe alternatives you've considered
You can repartition your DataFrame to more partitions and write out smaller
files, but sometimes you really do want large files to be written.
### Additional context
To reproduce this (adapted from @alamb's example in #6983) :
```
cd datafusion/benchmarks
./bench.sh data tpch10
```
```rust
use std::{io::Error, time::Instant, sync::Arc};
use datafusion::prelude::*;
use chrono;
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use url::Url;
const FILENAME: &str =
"/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let _ctx = SessionContext::new();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
_ctx.runtime_env().register_object_store(&local_url, local);
let _read_options = ParquetReadOptions { file_extension: ".parquet",
table_partition_cols: vec!(), parquet_pruning: None, skip_metadata: None };
let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();
let start = Instant::now();
println!("datafusion start -> {:?}", chrono::offset::Local::now());
_df.write_parquet("file://local/home/dev/arrow-datafusion/test_out/",
None).await?;
let elapsed = Instant::now() - start;
println!("datafusion end -> {:?} {elapsed:?}",
chrono::offset::Local::now());
Ok(())
}
```
This took 379s on my machine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]