lindong28 commented on code in PR #203:
URL: https://github.com/apache/flink-ml/pull/203#discussion_r1097198478
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java:
##########
@@ -373,34 +395,45 @@ public TopKFunction(String distCol, int
numNearestNeighbors) {
}
@Override
- public PriorityQueue<Row> createAccumulator() {
- return new PriorityQueue<>(numNearestNeighbors, new
DistColComparator(distCol));
+ public List<Row> createAccumulator() {
+ return new ArrayList<>(numNearestNeighbors);
}
@Override
- public PriorityQueue<Row> add(Row value, PriorityQueue<Row>
accumulator) {
- if (accumulator.size() == numNearestNeighbors) {
- Row peek = accumulator.peek();
- if (accumulator.comparator().compare(value, peek) < 0) {
- accumulator.poll();
- }
+ public List<Row> add(Row value, List<Row> accumulator) {
+ if (topKRows == null) {
+ topKRows = new PriorityQueue<>(numNearestNeighbors, new
DistColComparator(distCol));
+ topKRows.addAll(accumulator);
}
- accumulator.add(value);
- return accumulator;
+ insert(value, topKRows);
+ return new ArrayList<>(topKRows);
Review Comment:
Note that if the aggregator needs to be serializable by either `kryo` or
our custom serializer, we can define a specific container class that has the
`PriorityQueue<ROW>` as member variable and define a custom serializer for it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]