aocsa commented on pull request #11019: URL: https://github.com/apache/arrow/pull/11019#issuecomment-913977337
Thanks for the feedback @lidavidm > Thanks for the updates. > > We may actually want this to be implemented multiple ways: as both a hash aggregate kernel and a vector kernel. We'll want the hash aggregate kernel to match Pandas, where [nlargest](https://pandas.pydata.org/docs/reference/api/pandas.core.groupby.SeriesGroupBy.nlargest.html) is part of a group by operation. Of course, we can split out the implementation of that for later, but I think it's worth considering how we can share the implementations as much as possible. (It may not be that you can share much, since the hash aggregate kernel has to work in a streaming fashion which this does not do.) > > We'll still want a vector kernel implementation (the one here) as essentially a pipeline sink, since this implements an `... ORDER BY ... LIMIT` operation. (We'll need to implement an ExecNode for that - that can be a separate follow up JIRA, it should be straightforward.) Note if you do want to match the API of SortIndices this should return indices, not values. > Update: I changed these APIs to return only indices like SortIndices. > Also, note there are some CI failures - mostly just different compilers looking for different things. > > It might be good to compare the kernel here with SortIndices (since you could emulate a top-K by calling SortIndices, slicing the output, then calling Take) - I wonder how much of a speedup this achieves. Here the benchmarks results for `topk with sortingIndices` and `topk with NonStableHeap`, values are int64 scalars with length from 1 << 20 to 1 << 23 and with 100 of null_proportion , k is 1/8*N: `TopK_SortingIndices` ``` ----------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... ----------------------------------------------------------------------------------------------- TopKInt64/32768/10000/min_time:1.000 199633 ns 199601 ns 7010 bytes_per_second=156.563M/s items_per_second=20.521M/s null_percent=0.01 size=32.768k TopKInt64/32768/100/min_time:1.000 205520 ns 205486 ns 6734 bytes_per_second=152.078M/s items_per_second=19.9332M/s null_percent=1 size=32.768k TopKInt64/32768/10/min_time:1.000 196022 ns 195979 ns 7124 bytes_per_second=159.456M/s items_per_second=20.9002M/s null_percent=10 size=32.768k TopKInt64/32768/2/min_time:1.000 124550 ns 124531 ns 11204 bytes_per_second=250.941M/s items_per_second=32.8914M/s null_percent=50 size=32.768k TopKInt64/32768/1/min_time:1.000 9395 ns 9394 ns 149397 bytes_per_second=3.24871G/s items_per_second=436.034M/s null_percent=100 size=32.768k TopKInt64/32768/0/min_time:1.000 199116 ns 199086 ns 7016 bytes_per_second=156.968M/s items_per_second=20.5741M/s null_percent=0 size=32.768k TopKInt64/1048576/100/min_time:1.000 10200499 ns 10199057 ns 137 bytes_per_second=98.0483M/s items_per_second=12.8514M/s null_percent=1 size=1048.58k TopKInt64/8388608/100/min_time:1.000 105033049 ns 105007534 ns 13 bytes_per_second=76.185M/s items_per_second=9.98572M/s null_percent=1 size=8.38861M ``` `TopK_NonStableHeap` ``` ----------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... ----------------------------------------------------------------------------------------------- TopKInt64/32768/10000/min_time:1.000 74202 ns 74191 ns 18553 bytes_per_second=421.208M/s items_per_second=55.2085M/s null_percent=0.01 size=32.768k TopKInt64/32768/100/min_time:1.000 78732 ns 78719 ns 17032 bytes_per_second=396.982M/s items_per_second=52.0333M/s null_percent=1 size=32.768k TopKInt64/32768/10/min_time:1.000 79833 ns 79821 ns 17285 bytes_per_second=391.5M/s items_per_second=51.3147M/s null_percent=10 size=32.768k TopKInt64/32768/2/min_time:1.000 71248 ns 71237 ns 19374 bytes_per_second=438.674M/s items_per_second=57.4979M/s null_percent=50 size=32.768k TopKInt64/32768/1/min_time:1.000 4680 ns 4679 ns 301875 bytes_per_second=6.52247G/s items_per_second=875.431M/s null_percent=100 size=32.768k TopKInt64/32768/0/min_time:1.000 73972 ns 73962 ns 18658 bytes_per_second=422.516M/s items_per_second=55.38M/s null_percent=0 size=32.768k TopKInt64/1048576/100/min_time:1.000 5714238 ns 5713428 ns 247 bytes_per_second=175.026M/s items_per_second=22.941M/s null_percent=1 size=1048.58k TopKInt64/8388608/100/min_time:1.000 67560407 ns 67546834 ns 20 bytes_per_second=118.436M/s items_per_second=15.5237M/s null_percent=1 size=8.38861M ``` These result show that `TopK_NonStableHeap` is quite faster (around 2.7x). However the implementation of a stable version of it using a StableHeap is not as good as `TopK_NonStableHeap`, as you can see below: `TopK_StableHeap` ``` ----------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations UserCounters... ----------------------------------------------------------------------------------------------- TopKInt64/32768/10000/min_time:1.000 213637 ns 213609 ns 6596 bytes_per_second=146.295M/s items_per_second=19.1752M/s null_percent=0.01 size=32.768k TopKInt64/32768/100/min_time:1.000 222127 ns 222091 ns 6332 bytes_per_second=140.708M/s items_per_second=18.4429M/s null_percent=1 size=32.768k TopKInt64/32768/10/min_time:1.000 216600 ns 216562 ns 6353 bytes_per_second=144.3M/s items_per_second=18.9137M/s null_percent=10 size=32.768k TopKInt64/32768/2/min_time:1.000 186647 ns 186613 ns 7585 bytes_per_second=167.459M/s items_per_second=21.9492M/s null_percent=50 size=32.768k TopKInt64/32768/1/min_time:1.000 8101 ns 8100 ns 173012 bytes_per_second=3.7677G/s items_per_second=505.692M/s null_percent=100 size=32.768k TopKInt64/32768/0/min_time:1.000 213979 ns 213950 ns 6595 bytes_per_second=146.062M/s items_per_second=19.1447M/s null_percent=0 size=32.768k TopKInt64/1048576/100/min_time:1.000 11343912 ns 11341499 ns 122 bytes_per_second=88.1718M/s items_per_second=11.5568M/s null_percent=1 size=1048.58k TopKInt64/8388608/100/min_time:1.000 123205686 ns 123179459 ns 12 bytes_per_second=64.9459M/s items_per_second=8.51259M/s null_percent=1 size=8.38861M ``` This implementation for stable select_k with a stable heap, although it is valid, is still simple and maybe the best choise here is a sort-based algorithm. IMO, implementation of stable algorithms need more exploration, and it could be implemented in a separate follow up JIRA issue adding an optional parameter to choose between `SortType{kStable, kNonStable}` implementation. Besides these updates, instead of a single SortOrder for RecordBatch and Table a new kernel called `Result select_k(datum, options)` was created where `options.sort_keys` type is `std::vector<SortKey<string, order>>`. Looking forward to your thoughts. cc @lidavidm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
