lindong28 commented on code in PR #203:
URL: https://github.com/apache/flink-ml/pull/203#discussion_r1097134531


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/quantile/QuantileSummary.java:
##########
@@ -395,12 +400,14 @@ public QueryResult(int index, long minRankAtIndex, double 
percentile) {
      *   <li>delta: the maximum span of the rank.
      * </ul>
      */
-    private static class StatsTuple implements Serializable {
+    public static class StatsTuple implements Serializable {
         private static final long serialVersionUID = 1L;
-        private final double value;
+        private double value;
         private long g;
         private long delta;
 
+        public StatsTuple() {}

Review Comment:
   Is this change needed to make this class POJO? If yes, then why its member 
variable is private?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/quantile/QuantileSummary.java:
##########
@@ -36,6 +37,7 @@
  * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
  * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
  */
+@TypeInfo(QuantileSummaryTypeInfoFactory.class)

Review Comment:
   Would it be simpler to put both `QuantileSummary` and 
`QuantileSummaryTypeInfoFactory` directly under the package 
   `common/util`?
   
   Note that `QuantileSummaryTypeInfoFactory` should be an internal class that 
users should not need to access directly.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHModel.java:
##########
@@ -373,34 +395,45 @@ public TopKFunction(String distCol, int 
numNearestNeighbors) {
         }
 
         @Override
-        public PriorityQueue<Row> createAccumulator() {
-            return new PriorityQueue<>(numNearestNeighbors, new 
DistColComparator(distCol));
+        public List<Row> createAccumulator() {
+            return new ArrayList<>(numNearestNeighbors);
         }
 
         @Override
-        public PriorityQueue<Row> add(Row value, PriorityQueue<Row> 
accumulator) {
-            if (accumulator.size() == numNearestNeighbors) {
-                Row peek = accumulator.peek();
-                if (accumulator.comparator().compare(value, peek) < 0) {
-                    accumulator.poll();
-                }
+        public List<Row> add(Row value, List<Row> accumulator) {
+            if (topKRows == null) {
+                topKRows = new PriorityQueue<>(numNearestNeighbors, new 
DistColComparator(distCol));
+                topKRows.addAll(accumulator);
             }
-            accumulator.add(value);
-            return accumulator;
+            insert(value, topKRows);
+            return new ArrayList<>(topKRows);

Review Comment:
   It would be pretty inefficient to copy the entire list every time we add a 
value.
   
   How about we use `PriorityQueue<Row>` as the accumulator and let 
`getResult(List<Row> accumulator)` return `List<ROW>`? I think we will only 
need to serialize the output of `getResult` but not the accumulator itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to