jorgecarleitao opened a new pull request #8473: URL: https://github.com/apache/arrow/pull/8473
Recently, we introduced `async` to `execute`. This allowed us to parallelize multiple partitions as we denote an execution of a part (of a partition) as the unit of work. However, a part is often a large task composing multiple batches and steps. This PR makes all our execution nodes return a [futures::Stream](https://docs.rs/futures/0.3.6/futures/stream/trait.Stream.html) instead of an Iterator. For reference, a Stream is an iterator of futures, which in this case is a future of a `Result<RecordBatch>`. This effectively breaks the execution in smaller units of work (on which an individual unit is an operation returns a `Result<RecordBatch>`) allowing each task to chew smaller bits. This adds `futures` as a direct dependency of DataFusion (it was only a dev-dependency). This leads to a +10% degradation in aggregates in micro benchmarking, which IMO is expected given that there is more context switching to handle. However, I expect (hope?) this to be independent of the number of batches and partitions, and be offset by any async work we perform to our sources (readers) and sinks (writers). I did not take the time to optimize - the primary goal was to implement the idea, have it compile and pass tests, and have some discussion about it. I expect that we should be able to replace some of our operations by `join_all`, thereby scheduling multiple tasks at once (instead of waiting one by one). <details> <summary>Benchmarks</summary> Aggregates: ``` aggregate_query_no_group_by 15 12 time: [908.71 us 961.16 us 1.0193 ms] change: [+5.9644% +10.567% +15.382%] (p = 0.00 < 0.05) Performance has regressed. Found 7 outliers among 100 measurements (7.00%) 2 (2.00%) high mild 5 (5.00%) high severe aggregate_query_group_by 15 12 time: [6.6902 ms 7.0747 ms 7.5420 ms] change: [+4.5521% +10.510% +18.352%] (p = 0.00 < 0.05) Performance has regressed. Found 8 outliers among 100 measurements (8.00%) 2 (2.00%) high mild 6 (6.00%) high severe aggregate_query_group_by_with_filter 15 12 time: [2.8901 ms 2.9207 ms 2.9531 ms] change: [-16.357% -8.7619% -2.2536%] (p = 0.01 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild ``` Math: ``` sqrt_20_9 time: [6.9844 ms 7.0582 ms 7.1363 ms] change: [+0.0557% +1.5625% +3.0408%] (p = 0.05 < 0.05) Change within noise threshold. Found 3 outliers among 100 measurements (3.00%) 2 (2.00%) high mild 1 (1.00%) high severe sqrt_20_12 time: [2.8350 ms 2.9504 ms 3.1204 ms] change: [+3.8751% +8.2857% +14.671%] (p = 0.00 < 0.05) Performance has regressed. Found 5 outliers among 100 measurements (5.00%) 2 (2.00%) high mild 3 (3.00%) high severe sqrt_22_12 time: [14.888 ms 15.242 ms 15.620 ms] change: [+7.6388% +10.709% +14.098%] (p = 0.00 < 0.05) Performance has regressed. Found 5 outliers among 100 measurements (5.00%) 3 (3.00%) high mild 2 (2.00%) high severe sqrt_22_14 time: [23.710 ms 23.817 ms 23.953 ms] change: [-4.3401% -3.1824% -2.0952%] (p = 0.00 < 0.05) Performance has improved. Found 11 outliers among 100 measurements (11.00%) 5 (5.00%) high mild 6 (6.00%) high severe ``` </details> I admit that this is a bit outside my comfort zone, and someone with more experience in `async/await` could be of help. IMO this would integrate very nicely with ARROW-10307, ARROW-9275, I _think_ it would also help ARROW-9707, and I _think_ that it also opens the possibility consuming / producing batches from/to sources and sinks from flight. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
