[ https://issues.apache.org/jira/browse/FLINK-13236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-13236: ----------------------------------- Labels: pull-request-available (was: ) > Fix bug and improve performance in TopNBuffer > --------------------------------------------- > > Key: FLINK-13236 > URL: https://issues.apache.org/jira/browse/FLINK-13236 > Project: Flink > Issue Type: Improvement > Reporter: Caizhi Weng > Assignee: Caizhi Weng > Priority: Minor > Labels: pull-request-available > > In {{TopNBuffer}} we have the following method: > {code:java} > /** > * Puts a record list into the buffer under the sortKey. > * Note: if buffer already contains sortKey, putAll will overwrite the > previous value > * > * @param sortKey sort key with which the specified values are to be > associated > * @param values record lists to be associated with the specified key > */ > void putAll(BaseRow sortKey, Collection<BaseRow> values) { > treeMap.put(sortKey, values); > currentTopNum += values.size(); > } > {code} > When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should > be first subtracted by the old {{value.size()}} in {{treeMap}} then added > (can't be directly added). As currently only {{AppendOnlyTopNFunction}} uses > this method in its init procedure, this bug is not triggered. > {code:java} > /** > * Gets record which rank is given value. > * > * @param rank rank value to search > * @return the record which rank is given value > */ > BaseRow getElement(int rank) { > int curRank = 0; > Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = > treeMap.entrySet().iterator(); > while (iter.hasNext()) { > Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next(); > Collection<BaseRow> list = entry.getValue(); > Iterator<BaseRow> listIter = list.iterator(); > while (listIter.hasNext()) { > BaseRow elem = listIter.next(); > curRank += 1; > if (curRank == rank) { > return elem; > } > } > } > return null; > } > {code} > We can remove the inner loop by adding {{curRank}} by {{list.size()}} each > time. -- This message was sent by Atlassian JIRA (v7.6.14#76016)