devinjdangelo opened a new pull request, #7562:
URL: https://github.com/apache/arrow-datafusion/pull/7562

   ## Which issue does this PR close?
   
   Related to https://github.com/apache/arrow-rs/issues/1718
   
   ## Rationale for this change
   
   #7452 sped up writing of CSV and JSON files (even when writing only a single 
large file) by serializing RecordBatches in parallel on multiple threads. This 
PR attempts to accomplish the same but for Parquet files. This is more 
challenging vs. CSV/JSON since Parquet serialization is not embarrassingly 
parallelizable.  
   
   #7483 enabled writing _different_  parquet files in parallel, but this PR 
attempts to allow writing a _single_ parquet file in parallel. 
   
   ## What changes are included in this PR?
   
   This PR implements the following strategy to parallelize writing a single 
parquet file:
   
   1. For each incoming RecordBatch stream, serialize a parquet file to an in 
memory buffer in parallel. 
   2. As files from step 1 finish, consume them in order on another task and 
stitch them together into one large parquet file.
   3. As 2 progresses, flush bytes from a shared buffer to an ObjectStore in 
multiple parts.
   
   Step 2 to 3 are streaming/incremental, but step 1 is not. In general, all N 
mini-parquet files are likely to be buffered in memory before they are consumed 
in step 2/3. 
   
   Given the tradeoff between execution time and memory usage in this 
implementation, this PR also provides a session config which allows turning off 
single file parallelization. This option is useful in memory constrained 
environments.
   
   Further steps that could improve this PR:
   - Stream bytes from step 1. to step 2. to limit the increase in memory 
consumption with increase in parallelism
   - Enable parallelizing serialization of multiple large parquet files. E.g. a 
system with 32 cores wishes to write out 4 1GB parquet files. Each parquet file 
could be written with parallelization of 8 (i.e. stitch together 8 smaller 
parquet files for each of the 4 large parquet files).
   
   ## Benchmarking
   All tests are run on a system with 16c/32t.  The following script is used to 
write parquet file(s) and capture execution time and peak memory consumption.
   
   ```rust
   use datafusion::{dataframe::DataFrameWriteOptions, prelude::*};
   use datafusion_common::DataFusionError;
   use object_store::local::LocalFileSystem;
   use peak_alloc::PeakAlloc;
   use std::{sync::Arc, time::Instant};
   use url::Url;
   
   #[global_allocator]
   static PEAK_ALLOC: PeakAlloc = PeakAlloc;
   
   const FILENAME: &str =
       
"/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
   
   #[tokio::main]
   async fn main() -> Result<(), DataFusionError> {
       let mut runtimes = Vec::new();
       let mut max_memory = Vec::new();
       for parallelism in [64, 32, 16, 8, 4, 1] {
           PEAK_ALLOC.reset_peak_usage();
           let _ctx = SessionContext::new();
           let local = Arc::new(LocalFileSystem::new());
           let local_url = Url::parse("file://local").unwrap();
           _ctx.runtime_env().register_object_store(&local_url, local);
   
           let _read_options = ParquetReadOptions::default();
   
   
           let _df = _ctx
               .read_parquet(FILENAME, _read_options.clone())
               .await
               .unwrap()
               // Optionally select a subset of 3 columns
               //.select_columns(&["l_orderkey", "l_partkey", "l_receiptdate"])?
               .repartition(Partitioning::Hash(vec![col("l_orderkey")], 
parallelism))? ;
   
           println!("Number of columns: {}", _df.schema().field_names().len());
   
           let out_path = format!(
               
"file://local/home/dev/arrow-datafusion/test_out/bench{parallelism}.parquet"
           );
   
           let start3 = Instant::now();
           _df.clone()
               .write_parquet(
                   out_path.as_str(),
                   DataFrameWriteOptions::new().with_single_file_output(true),
                   None,
               )
               .await?;
           let elapsed3 = Instant::now() - start3;
           println!("write as parquet with parallelism {parallelism} to disk 
took -> {elapsed3:?}");
           runtimes.push(elapsed3);
   
           let peak_mem = PEAK_ALLOC.peak_usage_as_mb();
           println!(
               "Peak memory usage with parallelism {parallelism} is: {}MB",
               peak_mem
           );
           max_memory.push(peak_mem);
       }
   
       println!("Runtimes: {:?}", runtimes);
       println!("Peak memory: {:?}", max_memory);
   
       Ok(())
   }
   ```
   
   Notes for test results:
   -  parallelism=1 always falls back to the same AsyncArrowWriter 
implementation as used on current main branch
   - "Output multiple files" is the same implementation developed in #7483 to 
write independent files in parallel
   - "Output single file (flush each subfile)" is the implementation in this PR
   - "Output single file (flush each 10MB)" is an alternative implementation in 
#(to open)
   
   ### Test 1, All 16 Columns, ~3.6GB Parquet File
   #### Execution Time(s)
   Parallelism | Output multiple files** | Output Single File (flush each 
subfile) | Output Single File (flush each 10MB)
   -- | -- | -- | --
   1* | 277.77 | 267.60 | 266.29
   4 | 83.69 | 88.25 | 133.41
   8 | 47.36 | 51.87 | 97.09
   16 | 35.79 | 39.92 | 84.65
   32 | 31.71 | 35.47 | 81.64
   64 | 32.23 | 36.21 | 82.24
   
   #### Peak Memory Consumption
   Parallelism | Output multiple files** | Output Single File (flush each 
subfile) | Output Single File (flush each 10MB)
   -- | -- | -- | --
   1* | 1753.9 | 1758.9 | 1759.7
   4 | 2465.2 | 7082.4 | 7092.9
   8 | 3402.5 | 7611.2 | 7604.8
   16 | 5145.3 | 7041.2 | 7884.3
   32 | 7795.9 | 7720.1 | 7720.7
   64 | 10957.3 | 10480.9 | 10290.9
   
   ### Test 2, Subset of 3 Columns, ~895MB Parquet File
   #### Execution Time(s)
   Parallelism | Output multiple files** | Output Single File (flush each 
subfile) | Output Single File (flush each 10MB)
   -- | -- | -- | --
   1* | 42.82 | 40.52 | 40.36
   4 | 14.24 | 14.98 | 29.73
   8 | 7.77 | 8.84 | 20.44
   16 | 5.33 | 6.59 | 18.93
   32 | 4.80 | 5.34 | 17.40
   64 | 4.98 | 5.54 | 17.78
   
   ### Peak Memory Consumption(MB)
   
   Parallelism | Output multiple files** | Output Single File (flush each 
subfile) | Output Single File (flush each 10MB)
   -- | -- | -- | --
   1* | 445.9 | 449.7 | 447.7
   4 | 577.1 | 1789.3 | 1693.9
   8 | 756.9 | 1923.4 | 1865.5
   16 | 1038.2 | 1940.8 | 1933.0
   32 | 1554.2 | 1899.7 | 1907.0
   64 | 2337.5 | 1710.4 | 1726.8
   
   ## Are these changes tested?
   
   Yes by existing tests
   
   ## Are there any user-facing changes?
   
   Faster single parquet writes and a new config option to enable/disable (on 
by default).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to