[ https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897148#comment-15897148 ]
ASF GitHub Bot commented on FLINK-5956: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104392189 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala --- @@ -72,37 +72,34 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A a.f1 -= 1L - if (!a.f2.containsKey(v)) { - throw TableException("unexpected retract message") - } else { - var count = a.f2.get(v) - count -= 1L - if (count == 0) { - //remove the key v from the map if the number of appearance of the value v is 0 - a.f2.remove(v) - //if the total count is 0, we could just simply set the f0(min) to the initial value - if (a.f1 == 0) { - a.f0 = getInitValue - return - } - //if v is the current min value, we have to iterate the map to find the 2nd smallest - // value to replace v as the min value - if (v == a.f0) { - val iterator = a.f2.keySet().iterator() - var key = iterator.next() - a.f0 = key - while (iterator.hasNext()) { - key = iterator.next() - if (ord.compare(a.f0, key) > 0) { - a.f0 = key - } + var count = a.f2.get(v) + count -= 1L + if (count == 0) { + //remove the key v from the map if the number of appearance of the value v is 0 + a.f2.remove(v) + //if the total count is 0, we could just simply set the f0(min) to the initial value + if (a.f1 == 0) { --- End diff -- remove `a.f1` from accumulator? > Add retract method into the aggregateFunction > --------------------------------------------- > > Key: FLINK-5956 > URL: https://issues.apache.org/jira/browse/FLINK-5956 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > Retraction method is help for processing updated message. It will also very > helpful for window Aggregation. This PR will first add retraction methods > into the aggregateFunctions, such that on-going over window Aggregation can > get benefit from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)