[ 
https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897149#comment-15897149
 ] 

ASF GitHub Bot commented on FLINK-5956:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3470#discussion_r104392040
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
 ---
    @@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction[T](implicit 
ord: Ordering[T]) extends A
       }
     
       override def merge(accumulators: JList[Accumulator]): Accumulator = {
    -    val ret = accumulators.get(0)
    +    val ret = 
accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]]
         var i: Int = 1
         while (i < accumulators.size()) {
           val a = 
accumulators.get(i).asInstanceOf[MaxWithRetractAccumulator[T]]
           if (a.f1 != 0) {
    -        accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0)
    +        val iterator = a.f2.keySet().iterator()
    +        while (iterator.hasNext()) {
    +          val key = iterator.next()
    +          //updating the resulting max value if needed
    +          if (ord.compare(ret.f0, key) < 0) {
    --- End diff --
    
    I think we can simply compare the max values of both accumulators (no need 
to compare all in the hash set) and merge both hash sets:
    ```
    // set max element
    if (ord.compare(ret.f0, key) < 0) {
      ret.f0 = a.f0;
    }
    // merge hash maps
    for (T key: a.f2.keySet()) {
      if (ret.f2.containsKey(key)) {
        ret.f2.put(key, ret.f2.get(key) + count)
      } else {
        ret.f2.put(key, a.f2.get(key))
      }
    }
    ```



> Add retract method into the aggregateFunction
> ---------------------------------------------
>
>                 Key: FLINK-5956
>                 URL: https://issues.apache.org/jira/browse/FLINK-5956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Retraction method is help for processing updated message. It will also very 
> helpful for window Aggregation. This PR will first add retraction methods 
> into the aggregateFunctions, such that on-going over window Aggregation can 
> get benefit from it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to