[ 
https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895999#comment-15895999
 ] 

ASF GitHub Bot commented on FLINK-5956:
---------------------------------------

Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3470#discussion_r104301312
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
    @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
           val v = value.asInstanceOf[T]
           val a = accumulator.asInstanceOf[SumAccumulator[T]]
           a.f0 = numeric.plus(v, a.f0)
    -      a.f1 = true
    +      a.f1 += 1
    +    }
    +  }
    +
    +  override def retract(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[T]
    +      val a = accumulator.asInstanceOf[SumAccumulator[T]]
    +      a.f0 = numeric.plus(v, a.f0)
    +      a.f1 -= 1
    +      if (a.f1 < 0) {
    --- End diff --
    
    This exception usually won't happen if we use the retract for bounded over 
windows. With the on-going dataStream retraction design, the source table can 
be a table with PrimaryKey, which can generate retraction message. The 
downstream streaming job may receive the garbage retraction message, as the 
logging of the soureTable could contain the out of date retractions. The 
initial intent to throw the exception here is actually a mark for the future 
retraction design. Let me add a comment here (or may be just noted down myself) 
and remove the checks for now.


> Add retract method into the aggregateFunction
> ---------------------------------------------
>
>                 Key: FLINK-5956
>                 URL: https://issues.apache.org/jira/browse/FLINK-5956
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Retraction method is help for processing updated message. It will also very 
> helpful for window Aggregation. This PR will first add retraction methods 
> into the aggregateFunctions, such that on-going over window Aggregation can 
> get benefit from it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to