jorgecarleitao opened a new pull request #8473:
URL: https://github.com/apache/arrow/pull/8473


   Recently, we introduced `async` to `execute`. This allowed us to parallelize 
multiple partitions as we denote an execution of a part (of a partition) as the 
unit of work. However, a part is often a large task composing multiple batches 
and steps.
   
   This PR makes all our execution nodes return a 
[futures::Stream](https://docs.rs/futures/0.3.6/futures/stream/trait.Stream.html)
 instead of an Iterator. For reference, a Stream is an iterator of futures, 
which in this case is a future of a `Result<RecordBatch>`.
   
   This effectively breaks the execution in smaller units of work (on which an 
individual unit is an operation returns a `Result<RecordBatch>`) allowing each 
task to chew smaller bits.
   
   This adds `futures` as a direct dependency of DataFusion (it was only a 
dev-dependency).
   
   This leads to a +10% degradation in aggregates in micro benchmarking, which 
IMO is expected given that there is more context switching to handle. However, 
I expect (hope?) this to be independent of the number of batches and 
partitions, and be offset by any async work we perform to our sources (readers) 
and sinks (writers).
   
   I did not take the time to optimize - the primary goal was to implement the 
idea, have it compile and pass tests, and have some discussion about it. I 
expect that we should be able to replace some of our operations by `join_all`, 
thereby scheduling multiple tasks at once (instead of waiting one by one).
   
   <details>
    <summary>Benchmarks</summary>
   
   Aggregates:
   ```
   aggregate_query_no_group_by 15 12                                            
                                
                           time:   [908.71 us 961.16 us 1.0193 ms]
                           change: [+5.9644% +10.567% +15.382%] (p = 0.00 < 
0.05)
                           Performance has regressed.
   Found 7 outliers among 100 measurements (7.00%)
     2 (2.00%) high mild
     5 (5.00%) high severe
   
   aggregate_query_group_by 15 12                                               
                             
                           time:   [6.6902 ms 7.0747 ms 7.5420 ms]
                           change: [+4.5521% +10.510% +18.352%] (p = 0.00 < 
0.05)
                           Performance has regressed.
   Found 8 outliers among 100 measurements (8.00%)
     2 (2.00%) high mild
     6 (6.00%) high severe
   
   aggregate_query_group_by_with_filter 15 12                                   
                                          
                           time:   [2.8901 ms 2.9207 ms 2.9531 ms]
                           change: [-16.357% -8.7619% -2.2536%] (p = 0.01 < 
0.05)
                           Performance has improved.
   Found 3 outliers among 100 measurements (3.00%)
     3 (3.00%) high mild
   ```
   
   Math:
   ```
   sqrt_20_9               time:   [6.9844 ms 7.0582 ms 7.1363 ms]              
        
                           change: [+0.0557% +1.5625% +3.0408%] (p = 0.05 < 
0.05)
                           Change within noise threshold.
   Found 3 outliers among 100 measurements (3.00%)
     2 (2.00%) high mild
     1 (1.00%) high severe
   
   sqrt_20_12              time:   [2.8350 ms 2.9504 ms 3.1204 ms]              
          
                           change: [+3.8751% +8.2857% +14.671%] (p = 0.00 < 
0.05)
                           Performance has regressed.
   Found 5 outliers among 100 measurements (5.00%)
     2 (2.00%) high mild
     3 (3.00%) high severe
   
   sqrt_22_12              time:   [14.888 ms 15.242 ms 15.620 ms]              
         
                           change: [+7.6388% +10.709% +14.098%] (p = 0.00 < 
0.05)
                           Performance has regressed.
   Found 5 outliers among 100 measurements (5.00%)
     3 (3.00%) high mild
     2 (2.00%) high severe
   
   sqrt_22_14              time:   [23.710 ms 23.817 ms 23.953 ms]              
         
                           change: [-4.3401% -3.1824% -2.0952%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 11 outliers among 100 measurements (11.00%)
     5 (5.00%) high mild
     6 (6.00%) high severe
   ```
   </details>
   
   I admit that this is a bit outside my comfort zone, and someone with more 
experience in `async/await` could be of help.
   
   IMO this would integrate very nicely with ARROW-10307, ARROW-9275, I _think_ 
it would also help ARROW-9707, and I _think_ that it also opens the possibility 
consuming / producing batches from/to sources and sinks from flight.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to