[ 
https://issues.apache.org/jira/browse/FLINK-13236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13236:
-----------------------------------
    Labels: pull-request-available  (was: )

> Fix bug and improve performance in TopNBuffer
> ---------------------------------------------
>
>                 Key: FLINK-13236
>                 URL: https://issues.apache.org/jira/browse/FLINK-13236
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Caizhi Weng
>            Assignee: Caizhi Weng
>            Priority: Minor
>              Labels: pull-request-available
>
> In {{TopNBuffer}} we have the following method:
> {code:java}
> /**
>  * Puts a record list into the buffer under the sortKey.
>  * Note: if buffer already contains sortKey, putAll will overwrite the 
> previous value
>  *
>  * @param sortKey sort key with which the specified values are to be 
> associated
>  * @param values record lists to be associated with the specified key
>  */
> void putAll(BaseRow sortKey, Collection<BaseRow> values) {
>       treeMap.put(sortKey, values);
>       currentTopNum += values.size();
> }
> {code}
> When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should 
> be first subtracted by the old {{value.size()}} in {{treeMap}} then added 
> (can't be directly added). As currently only {{AppendOnlyTopNFunction}} uses 
> this method in its init procedure, this bug is not triggered.
> {code:java}
> /**
>  * Gets record which rank is given value.
>  *
>  * @param rank rank value to search
>  * @return the record which rank is given value
>  */
> BaseRow getElement(int rank) {
>       int curRank = 0;
>       Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = 
> treeMap.entrySet().iterator();
>       while (iter.hasNext()) {
>               Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next();
>               Collection<BaseRow> list = entry.getValue();
>               Iterator<BaseRow> listIter = list.iterator();
>               while (listIter.hasNext()) {
>                       BaseRow elem = listIter.next();
>                       curRank += 1;
>                       if (curRank == rank) {
>                               return elem;
>                       }
>               }
>       }
>       return null;
> }
> {code}
> We can remove the inner loop by adding {{curRank}} by {{list.size()}} each 
> time.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to