[ 
https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895910#comment-15895910
 ] 

ASF GitHub Bot commented on FLINK-5956:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3470#discussion_r104298261
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
    @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
           val v = value.asInstanceOf[T]
           val a = accumulator.asInstanceOf[SumAccumulator[T]]
           a.f0 = numeric.plus(v, a.f0)
    -      a.f1 = true
    +      a.f1 += 1
    +    }
    +  }
    +
    +  override def retract(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[T]
    +      val a = accumulator.asInstanceOf[SumAccumulator[T]]
    +      a.f0 = numeric.plus(v, a.f0)
    +      a.f1 -= 1
    +      if (a.f1 < 0) {
    --- End diff --
    
    Do we want to check for these errors? It adds overhead and is the 
responsibility of the runtime to call the functions correctly. I'd rather add 
tests with custom aggregation functions to verify the behavior instead of 
adding overhead to the most widely used aggregation functions. We didn't add 
these checks to `CountAggregate` and `AverageAggregate`.


> Add retract method into the aggregateFunction
> ---------------------------------------------
>
>                 Key: FLINK-5956
>                 URL: https://issues.apache.org/jira/browse/FLINK-5956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Retraction method is help for processing updated message. It will also very 
> helpful for window Aggregation. This PR will first add retraction methods 
> into the aggregateFunctions, such that on-going over window Aggregation can 
> get benefit from it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to