aocsa commented on pull request #11019:
URL: https://github.com/apache/arrow/pull/11019#issuecomment-913977337


   Thanks for the feedback @lidavidm 
   
   > Thanks for the updates.
   > 
   > We may actually want this to be implemented multiple ways: as both a hash 
aggregate kernel and a vector kernel. We'll want the hash aggregate kernel to 
match Pandas, where 
[nlargest](https://pandas.pydata.org/docs/reference/api/pandas.core.groupby.SeriesGroupBy.nlargest.html)
 is part of a group by operation. Of course, we can split out the 
implementation of that for later, but I think it's worth considering how we can 
share the implementations as much as possible. (It may not be that you can 
share much, since the hash aggregate kernel has to work in a streaming fashion 
which this does not do.)
   > 
   > We'll still want a vector kernel implementation (the one here) as 
essentially a pipeline sink, since this implements an `... ORDER BY ... LIMIT` 
operation. (We'll need to implement an ExecNode for that - that can be a 
separate follow up JIRA, it should be straightforward.) Note if you do want to 
match the API of SortIndices this should return indices, not values.
   > 
   
   Update: I changed these APIs to return only indices like SortIndices.
   
   > Also, note there are some CI failures - mostly just different compilers 
looking for different things.
   > 
   > It might be good to compare the kernel here with SortIndices (since you 
could emulate a top-K by calling SortIndices, slicing the output, then calling 
Take) - I wonder how much of a speedup this achieves.
   
   Here the benchmarks results for  `topk with sortingIndices` and `topk with 
NonStableHeap`, values are int64 scalars with length from 1 << 20 to 1 << 23 
and with 100 of null_proportion , k is 1/8*N:
   
   
   `TopK_SortingIndices`
   ```
   
-----------------------------------------------------------------------------------------------
   Benchmark                                     Time             CPU   
Iterations UserCounters...
   
-----------------------------------------------------------------------------------------------
   TopKInt64/32768/10000/min_time:1.000     199633 ns       199601 ns         
7010 bytes_per_second=156.563M/s items_per_second=20.521M/s null_percent=0.01 
size=32.768k
   TopKInt64/32768/100/min_time:1.000       205520 ns       205486 ns         
6734 bytes_per_second=152.078M/s items_per_second=19.9332M/s null_percent=1 
size=32.768k
   TopKInt64/32768/10/min_time:1.000        196022 ns       195979 ns         
7124 bytes_per_second=159.456M/s items_per_second=20.9002M/s null_percent=10 
size=32.768k
   TopKInt64/32768/2/min_time:1.000         124550 ns       124531 ns        
11204 bytes_per_second=250.941M/s items_per_second=32.8914M/s null_percent=50 
size=32.768k
   TopKInt64/32768/1/min_time:1.000           9395 ns         9394 ns       
149397 bytes_per_second=3.24871G/s items_per_second=436.034M/s null_percent=100 
size=32.768k
   TopKInt64/32768/0/min_time:1.000         199116 ns       199086 ns         
7016 bytes_per_second=156.968M/s items_per_second=20.5741M/s null_percent=0 
size=32.768k
   TopKInt64/1048576/100/min_time:1.000   10200499 ns     10199057 ns          
137 bytes_per_second=98.0483M/s items_per_second=12.8514M/s null_percent=1 
size=1048.58k
   TopKInt64/8388608/100/min_time:1.000  105033049 ns    105007534 ns           
13 bytes_per_second=76.185M/s items_per_second=9.98572M/s null_percent=1 
size=8.38861M
   ```
   
   `TopK_NonStableHeap`
   ```
   
-----------------------------------------------------------------------------------------------
   Benchmark                                     Time             CPU   
Iterations UserCounters...
   
-----------------------------------------------------------------------------------------------
   TopKInt64/32768/10000/min_time:1.000      74202 ns        74191 ns        
18553 bytes_per_second=421.208M/s items_per_second=55.2085M/s null_percent=0.01 
size=32.768k
   TopKInt64/32768/100/min_time:1.000        78732 ns        78719 ns        
17032 bytes_per_second=396.982M/s items_per_second=52.0333M/s null_percent=1 
size=32.768k
   TopKInt64/32768/10/min_time:1.000         79833 ns        79821 ns        
17285 bytes_per_second=391.5M/s items_per_second=51.3147M/s null_percent=10 
size=32.768k
   TopKInt64/32768/2/min_time:1.000          71248 ns        71237 ns        
19374 bytes_per_second=438.674M/s items_per_second=57.4979M/s null_percent=50 
size=32.768k
   TopKInt64/32768/1/min_time:1.000           4680 ns         4679 ns       
301875 bytes_per_second=6.52247G/s items_per_second=875.431M/s null_percent=100 
size=32.768k
   TopKInt64/32768/0/min_time:1.000          73972 ns        73962 ns        
18658 bytes_per_second=422.516M/s items_per_second=55.38M/s null_percent=0 
size=32.768k
   TopKInt64/1048576/100/min_time:1.000    5714238 ns      5713428 ns          
247 bytes_per_second=175.026M/s items_per_second=22.941M/s null_percent=1 
size=1048.58k
   TopKInt64/8388608/100/min_time:1.000   67560407 ns     67546834 ns           
20 bytes_per_second=118.436M/s items_per_second=15.5237M/s null_percent=1 
size=8.38861M
   ```
   
   These result show that `TopK_NonStableHeap` is quite faster (around 2.7x).  
However  the implementation of a stable version of it using a StableHeap is not 
as good as `TopK_NonStableHeap`, as you can  see  below:
   
   `TopK_StableHeap`
   ```
   
-----------------------------------------------------------------------------------------------
   Benchmark                                     Time             CPU   
Iterations UserCounters...
   
-----------------------------------------------------------------------------------------------
   TopKInt64/32768/10000/min_time:1.000     213637 ns       213609 ns         
6596 bytes_per_second=146.295M/s items_per_second=19.1752M/s null_percent=0.01 
size=32.768k
   TopKInt64/32768/100/min_time:1.000       222127 ns       222091 ns         
6332 bytes_per_second=140.708M/s items_per_second=18.4429M/s null_percent=1 
size=32.768k
   TopKInt64/32768/10/min_time:1.000        216600 ns       216562 ns         
6353 bytes_per_second=144.3M/s items_per_second=18.9137M/s null_percent=10 
size=32.768k
   TopKInt64/32768/2/min_time:1.000         186647 ns       186613 ns         
7585 bytes_per_second=167.459M/s items_per_second=21.9492M/s null_percent=50 
size=32.768k
   TopKInt64/32768/1/min_time:1.000           8101 ns         8100 ns       
173012 bytes_per_second=3.7677G/s items_per_second=505.692M/s null_percent=100 
size=32.768k
   TopKInt64/32768/0/min_time:1.000         213979 ns       213950 ns         
6595 bytes_per_second=146.062M/s items_per_second=19.1447M/s null_percent=0 
size=32.768k
   TopKInt64/1048576/100/min_time:1.000   11343912 ns     11341499 ns          
122 bytes_per_second=88.1718M/s items_per_second=11.5568M/s null_percent=1 
size=1048.58k
   TopKInt64/8388608/100/min_time:1.000  123205686 ns    123179459 ns           
12 bytes_per_second=64.9459M/s items_per_second=8.51259M/s null_percent=1 
size=8.38861M
   
   ```
   
   This implementation for stable select_k with a stable heap, although it is 
valid, is still simple and maybe the best choise here is a sort-based 
algorithm.  IMO, implementation of stable algorithms need more exploration, and 
it could be implemented in a separate follow up JIRA issue adding an optional 
parameter to choose between `SortType{kStable, kNonStable}` implementation. 
   
   Besides these updates, instead of a single SortOrder for RecordBatch and 
Table  a new kernel called `Result select_k(datum, options)` was created  where 
`options.sort_keys` type is `std::vector<SortKey<string, order>>`.
   
   Looking forward to your thoughts. cc @lidavidm
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to