devinjdangelo opened a new pull request, #7632:
URL: https://github.com/apache/arrow-datafusion/pull/7632

   This PR is Draft since it cannot compile without arrow-rs changes: #here 
   
   ## Which issue does this PR close?
   
   Closes #7591 
   Closes #7589
   Related to https://github.com/apache/arrow-rs/issues/1718
   
   ## Rationale for this change
   
   Parallel parquet serialization process implemented in #7562 did not support 
all parquet metadata (indexes/bloom filters) and had no backpressure on 
serialization tasks. This PR aims to address these two deficiencies. 
   
   ## What changes are included in this PR?
   
   - Parallel parquet serialization tasks now use `ArrowRowGroupWriter` 
directly rather than `ArrowWriter`
   - Upstream `arrow-rs` changes filed to make `ArrowRowGroupWriter` public and 
`Send` so it can be used across an `.await`
   - Parquet serialization tasks can be throttled via a bounded channel 
mechanism.
   
   ## Benchmarking Results
   
   The results show the parallel parquet process in this PR is ~10% faster than 
the previous in addition to supporting statistics/bloom filters. 
Channel_limit=N means that the maximum number of parallel parquet serialization 
tasks running at one time is set to N. Surprisingly setting this number low can 
actually increase peak memory usage, which is a surprising result.
   
   See #7562 for benchmarking script used. 
   
   ### Test 1, All 16 Columns, ~3.6GB Parquet File (release build)
   
   #### Execution Time (s)
   Parallelism | Main (single_file_output=false) | Main 
(single_file_output=true) | This PR (single_file_output=true, 
channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 22.48 | 22.53 | 21.04 | 22.17
   4 | 12.24 | 14.4 | 12.49 | 12.73
   8 | 10.79 | 12.37 | 10.7 | 11.03
   16 | 10.52 | 12.67 | 10.78 | 10.85
   32 | 10.91 | 12.07 | 10.31 | 10.25
   64 | 10.21 | 12.97 | 12.34 | 11.62
   
   #### Peak Memory Usage (MB)
   Parallelism | Main (single_file_output=false) | Main 
(single_file_output=true) | This PR (single_file_output=true, 
channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 1753.3 | 1758 | 1757.1 | 1760
   4 | 2445.4 | 7104 | 5690.7 | 5684.2
   8 | 3387 | 7623.1 | 6642 | 7804
   16 | 5047.6 | 8262.6 | 8151 | 10437.7
   32 | 7683.6 | 7672.6 | 9358 | 11657.5
   64 | 10961.1 | 10370.2 | 10388 | 12898
   
   ### Test 2, Subset of 3 Columns, ~895MB Parquet File (release build)
   
   #### Execution Time (s)
   Parallelism | Main (single_file_output=false) | Main 
(single_file_output=true) | This PR (single_file_output=true, 
channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 3.57 | 3.15 | 3.38 | 3.39
   4 | 1.78 | 2.37 | 2.53 | 2.3
   8 | 1.45 | 2.07 | 2.58 | 2.23
   16 | 1.54 | 2.09 | 2.06 | 1.71
   32 | 1.7 | 2.1 | 2.08 | 1.65
   64 | 1.89 | 2.72 | 2.76 | 2.07
   
   #### Peak Memory Consumption (MB)
   Parallelism | Main (single_file_output=false) | Main 
(single_file_output=true) | This PR (single_file_output=true, 
channel_limit=100) | This PR (single_file_output=true, channel_limit=4)
   -- | -- | -- | -- | --
   1 | 450.6 | 451.6 | 448.9 | 449.4
   4 | 584.5 | 1659.1 | 1257.0 | 1284.4
   8 | 759.4 | 1939.7 | 1225.7 | 1339.2
   16 | 1045.8 | 2051.2 | 1359.8 | 1438.5
   32 | 1564.6 | 1899.7 | 1445.5 | 1545.3
   64 | 2318.8 | 1726.1 | 1732.0 | 1735.3
   
   ## Are these changes tested?
   
   Yes, by existing tests and adhoc benchmarking
   
   ## Are there any user-facing changes?
   
   No
   
   ## Next Steps
   
   So far, parquet serialization is only being parallelized in terms of 
RowGroups. This means we are limited in terms of parallelization based on the 
number of RowGroups we want in our file, which can be as low as 1 in general. 
Parquet files generally have a large number of columns and we could parallelize 
at the column level in addition to speed up more.
   
   We could also break free of the Parallelism=RowGroupNumber limit if it were 
possible to concatenate (`ArrowColumnChunk`, `ColumnCloseResult`) tuples 
together before writing them into a RowGroup. This might not be possible 
efficiently, since `ArrowColumnChunk`'s are already compressed. Aggregating 
column min/max statistics would be trivial, but distinct values + bloom filters 
would not be trivial.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to