[ 
https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897148#comment-15897148
 ] 

ASF GitHub Bot commented on FLINK-5956:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3470#discussion_r104392189
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
 ---
    @@ -72,37 +72,34 @@ abstract class MinWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
     
           a.f1 -= 1L
     
    -      if (!a.f2.containsKey(v)) {
    -        throw TableException("unexpected retract message")
    -      } else {
    -        var count = a.f2.get(v)
    -        count -= 1L
    -        if (count == 0) {
    -          //remove the key v from the map if the number of appearance of 
the value v is 0
    -          a.f2.remove(v)
    -          //if the total count is 0, we could just simply set the f0(min) 
to the initial value
    -          if (a.f1 == 0) {
    -            a.f0 = getInitValue
    -            return
    -          }
    -          //if v is the current min value, we have to iterate the map to 
find the 2nd smallest
    -          // value to replace v as the min value
    -          if (v == a.f0) {
    -            val iterator = a.f2.keySet().iterator()
    -            var key = iterator.next()
    -            a.f0 = key
    -            while (iterator.hasNext()) {
    -              key = iterator.next()
    -              if (ord.compare(a.f0, key) > 0) {
    -                a.f0 = key
    -              }
    +      var count = a.f2.get(v)
    +      count -= 1L
    +      if (count == 0) {
    +        //remove the key v from the map if the number of appearance of the 
value v is 0
    +        a.f2.remove(v)
    +        //if the total count is 0, we could just simply set the f0(min) to 
the initial value
    +        if (a.f1 == 0) {
    --- End diff --
    
    remove `a.f1` from accumulator?


> Add retract method into the aggregateFunction
> ---------------------------------------------
>
>                 Key: FLINK-5956
>                 URL: https://issues.apache.org/jira/browse/FLINK-5956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Retraction method is help for processing updated message. It will also very 
> helpful for window Aggregation. This PR will first add retraction methods 
> into the aggregateFunctions, such that on-going over window Aggregation can 
> get benefit from it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to