[ 
https://issues.apache.org/jira/browse/ARROW-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456009#comment-17456009
 ] 

Weston Pace commented on ARROW-14965:
-------------------------------------

The latest default value is actually min(32, os.cpu_count() + 4) which would be 
too small in this case.  It's hard to say that there is a good single default 
for this.  5*cpu_count would be too high for an HDD (and probably even an SSD). 
 There is some prerequisite work to allow the thread pool to be configured 
per-filesystem which might help us come up with better defaults.  I've created 
ARROW-15035 as a placeholder to finish that work.  However, I don't know when 
someone will have time to get to it.  Being able to change the global thread 
pool size is probably "good enough" for some time.

Also, it seems like we are not quite reaching the peak multi-process 
performance.  I agree it would be interesting to test a higher max connections. 
 I've created ARROW-15036 for this.

This issue we can leave open in case there is some other factor preventing us 
from reaching that 2 seconds.

> [Python][C++] Contention when reading Parquet files with multi-threading
> ------------------------------------------------------------------------
>
>                 Key: ARROW-14965
>                 URL: https://issues.apache.org/jira/browse/ARROW-14965
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++, Python
>    Affects Versions: 6.0.0
>            Reporter: Nick Gates
>            Priority: Minor
>
> I'm attempting to read a table from multiple Parquet files where I already 
> know which row_groups I want to read from each file. I also want to apply a 
> filter expression while reading. To do this my code looks roughly like this:
>  
> {code:java}
> def read_file(filepath):
>     format = ds.ParquetFileFormat(...)
>     fragment = format.make_fragment(filepath, row_groups=[0, 1, 2, ...])
>     scanner = ds.Scanner.from_fragment(
>         fragment, 
>         use_threads=True,
>         use_async=False,
>         filter=...
>     )
>     return scanner.to_reader().read_all()
> with ThreadPoolExecutor() as pool:
>     pa.concat_tables(pool.map(read_file, file_paths)) {code}
> Running with a ProcessPoolExecutor, each of my 13 read_file calls takes at 
> most 2 seconds. However, with a ThreadPoolExecutor some of the read_file 
> calls take 20+ seconds.
>  
> I've tried running this with various combinations of use_threads and 
> use_async to try and see what's happening. The code blocks are sourced from 
> py-spy, and identifying contention was done with viztracer.
>  
> *use_threads: False, use_async: False*
>  * It looks like pyarrow._dataset.Scanner.to_reader doesn't release the GIL: 
> [https://github.com/apache/arrow/blob/be9a22b9b76d9cd83d85d52ffc2844056d90f367/python/pyarrow/_dataset.pyx#L3278-L3283]
>  * pyarrow._dataset.from_fragment seems to be contended. Py-spy suggests this 
> is around getting the physical_schema from the fragment?
>  
> {code:java}
> from_fragment (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
> __pyx_getprop_7pyarrow_8_dataset_8Fragment_physical_schema 
> (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so)
> __pthread_cond_timedwait (libpthread-2.17.so) {code}
>  
> *use_threads: False, use_async: True*
>  * There's no longer any contention for pyarrow._dataset.from_fragment
>  * But there's lots of contention for pyarrow.lib.RecordBatchReader.read_all
>  
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext 
> (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch>
>  > (pyarrow/libarrow_dataset.so.600)
> arrow::FutureImpl::Wait (pyarrow/libarrow.so.600) 
> std::condition_variable::wait (libstdc++.so.6.0.19){code}
> *use_threads: True, use_async: False*
>  * Appears to be some contention on Scanner.to_reader
>  * But most contention remains for RecordBatchReader.read_all
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext 
> (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::FunctionIterator<arrow::dataset::(anonymous
>  
> namespace)::SyncScanner::ScanBatches(arrow::Iterator<std::shared_ptr<arrow::dataset::ScanTask>
>  >)::{lambda()#1}, arrow::dataset::TaggedRecordBatch> > 
> (pyarrow/libarrow_dataset.so.600)
> std::condition_variable::wait (libstdc++.so.6.0.19)
> __pthread_cond_wait (libpthread-2.17.so) {code}
> *use_threads: True, use_async: True*
>  * Contention again mostly for RecordBatchReader.read_all, but seems to 
> complete in ~12 seconds rather than 20
> {code:java}
> arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600)
> arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext 
> (pyarrow/libarrow_dataset.so.600)
> arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch>
>  > (pyarrow/libarrow_dataset.so.600)
> arrow::FutureImpl::Wait (pyarrow/libarrow.so.600)
> std::condition_variable::wait (libstdc++.so.6.0.19)
> __pthread_cond_wait (libpthread-2.17.so) {code}
> Is this expected behaviour? Or should it be possible to achieve the same 
> performance from multi-threading as from multi-processing?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to