tustvold commented on issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473#issuecomment-1085914732


   Ok so an update on this. I implemented this proposal in #1509, along with a 
number of modifications to help improve its performance - e.g. avoid copying 
buffers, etc... I then created an updated DataFusion branch to make use of 
[this](https://github.com/tustvold/arrow-datafusion/tree/parquet-async-wip). 
Unfortunately the performance was still significantly worse than master :cry:
   
   ## Scheduler Bench
   
   The DataFusion compile times were starting to grate, and there were far too 
many variables, so I created 
[scheduler-bench](https://github.com/tustvold/scheduler-bench) which is a 
reduced version of one of the queries that was exhibiting problematic runtime 
characteristics in the DataFusion benchmark.
   
   Running this we get the following
   
   ```
   tokio_sync_file_test (2048): min: 0.0534s, max: 0.0660s, avg: 0.0548s, p95: 
0.0557s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0484s, max: 0.0813s, avg: 
0.0585s, p95: 0.0754s
   tokio_par_async_blocking_test (2048): min: 0.0563s, max: 0.0721s, avg: 
0.0584s, p95: 0.0600s
   tokio_par_sync_test (2048): min: 0.0536s, max: 0.0563s, avg: 0.0546s, p95: 
0.0554s
   ```
   
   `tokio_sync_file_test` runs a spawn_blocking task that decodes and sends 
RecordBatch over an mpsc channel, a separate tokio task then performs the rest 
of the query. `tokio_par_sync_test` is broadly similar, but uses a regular 
tokio task instead of spawn_blocking for the reader.
   
   `tokio_par_async_spawn_blocking_test` and `tokio_par_async_blocking_test` 
use a tokio task to run the async `ParquetRecordBatchStream`, which 
communicates over an mpsc channel with a separate tokio task that performs the 
rest of the query. The difference is `tokio_par_async_spawn_blocking` uses a 
separate spawn_blocking to read the column chunk data into the in-memory 
buffers, whereas the other cheekily just uses blocking IO in a tokio worker 
thread.
   
   Immediately there is an obvious disparity between 
`tokio_par_async_spawn_blocking_test` and the other methods. Initially I 
thought this was overheads of spawn_blocking, and in earlier incarnations it 
certainly was, but with the switch to the `Storage` interface there is now only 
a single spawn_blocking per row group (of which there are 2 in this file).
   
   I therefore added a load of instrumentation and noticed something strange, 
the performance variance in `tokio_par_async_spawn_blocking_test` is entirely 
in the synchronous section that decodes RecordBatch from the already read byte 
arrays. This immediately suggested some sort of socket-locality funkiness so I 
ran the test restricted to a single CPU core.
   
   ```
   $  taskset 0x1 cargo run --release
   tokio_sync_file_test (2048): min: 0.0593s, max: 0.0609s, avg: 0.0600s, p95: 
0.0605s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0619s, max: 0.0653s, avg: 
0.0627s, p95: 0.0633s
   tokio_par_async_blocking_test (2048): min: 0.0619s, max: 0.0635s, avg: 
0.0627s, p95: 0.0632s
   tokio_par_sync_test (2048): min: 0.0588s, max: 0.0609s, avg: 0.0595s, p95: 
0.0600s
   ```
   
   Similarly restricting the tokio worker pool to contain a single worker 
thread (note spawn_blocking will still use a separate thread)
   
   ```
   tokio_sync_file_test (2048): min: 0.0495s, max: 0.0510s, avg: 0.0499s, p95: 
0.0503s
   tokio_par_async_spawn_blocking_test (2048): min: 0.0469s, max: 0.0618s, avg: 
0.0491s, p95: 0.0495s
   tokio_par_async_blocking_test (2048): min: 0.0522s, max: 0.0534s, avg: 
0.0526s, p95: 0.0531s
   tokio_par_sync_test (2048): min: 0.0497s, max: 0.0507s, avg: 0.0501s, p95: 
0.0505s
   ```
   
   The performance across the board is actually better than when the worker 
pool had more threads, and the performance disparity between the approaches is 
largely eliminated.
   
   Whilst `perf` has to be taken with a grain of salt, as it doesn't properly 
understand tokio's userland threading, I captured a profile of a run that first 
performed `tokio_par_async_spawn_blocking_test` and then `tokio_par_sync_test`.
   
   
![image](https://user-images.githubusercontent.com/1781103/161268498-fff27578-4ef4-4570-b387-7fe77593c96f.png)
   
   We can clearly see work ping-ponging between threads for 
`tokio_par_async_spawn_blocking_test` and transitioning to a significantly more 
stable pattern for `tokio_par_sync_test`, which perhaps unsurprisingly performs 
better.
   
   Removing the second tokio task also results in the same improvements to 
workload variability and therefore average runtime performance.
   
   ```
   tokio_async_spawn_blocking_test (2048): min: 0.0557s, max: 0.0587s, avg: 
0.0565s, p95: 0.0582s
   tokio_async_blocking_test (2048): min: 0.0576s, max: 0.0599s, avg: 0.0584s, 
p95: 0.0593s
   ```
   
   ## Manual Threading
   
   So it certainly looks like tokio is not scheduling our tasks well, but 
perhaps there is something else at play. I therefore experimented without using 
tokio and manually threading the workload
   
   Here we can see the performance of a single-threaded execution against a 
file, and against data already in memory 
   
   ```
   sync_file_test (2048): min: 0.0541s, max: 0.0758s, avg: 0.0548s, p95: 0.0550s
   sync_mem_test (2048): min: 0.0585s, max: 0.0616s, avg: 0.0598s, p95: 0.0609s
   ```
   
   And for completeness the performance of a blocking implementation using two 
threads
   
   ```
   par_sync_file_test (2048): min: 0.0542s, max: 0.0584s, avg: 0.0563s, p95: 
0.0572s
   ```
   
   ## Conclusions
   
   I think this data is consistent with the following conclusions:
   
   * The performance of this query is largely dominated by the CPU-bound task 
of decoding the parquet bytes
   * There are overheads associated with buffering up parquet data ahead of 
time, on the order of ~5ms
   * There are CPU-locality effects on the order of ~30ms that impact the 
performance of decoding the parquet bytes
   
   I will come back to this next week with fresh eyes, but if the above is 
correct it would have the following implications:
   
   * The advantages of a fully async parquet implementation are likely to be 
undermined by the performance cost associated with the loss of thread-locality
   * Using tokio to schedule CPU-bound work within DataFusion is likely 
significantly more sub-optimal than we anticipated, especially for stateful 
operators
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to