[
https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895910#comment-15895910
]
ASF GitHub Bot commented on FLINK-5956:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3470#discussion_r104298261
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
---
@@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends
AggregateFunction[T] {
val v = value.asInstanceOf[T]
val a = accumulator.asInstanceOf[SumAccumulator[T]]
a.f0 = numeric.plus(v, a.f0)
- a.f1 = true
+ a.f1 += 1
+ }
+ }
+
+ override def retract(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[T]
+ val a = accumulator.asInstanceOf[SumAccumulator[T]]
+ a.f0 = numeric.plus(v, a.f0)
+ a.f1 -= 1
+ if (a.f1 < 0) {
--- End diff --
Do we want to check for these errors? It adds overhead and is the
responsibility of the runtime to call the functions correctly. I'd rather add
tests with custom aggregation functions to verify the behavior instead of
adding overhead to the most widely used aggregation functions. We didn't add
these checks to `CountAggregate` and `AverageAggregate`.
> Add retract method into the aggregateFunction
> ---------------------------------------------
>
> Key: FLINK-5956
> URL: https://issues.apache.org/jira/browse/FLINK-5956
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
> Retraction method is help for processing updated message. It will also very
> helpful for window Aggregation. This PR will first add retraction methods
> into the aggregateFunctions, such that on-going over window Aggregation can
> get benefit from it.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)