[
https://issues.apache.org/jira/browse/FLINK-5956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897148#comment-15897148
]
ASF GitHub Bot commented on FLINK-5956:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3470#discussion_r104392189
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
---
@@ -72,37 +72,34 @@ abstract class MinWithRetractAggFunction[T](implicit
ord: Ordering[T]) extends A
a.f1 -= 1L
- if (!a.f2.containsKey(v)) {
- throw TableException("unexpected retract message")
- } else {
- var count = a.f2.get(v)
- count -= 1L
- if (count == 0) {
- //remove the key v from the map if the number of appearance of
the value v is 0
- a.f2.remove(v)
- //if the total count is 0, we could just simply set the f0(min)
to the initial value
- if (a.f1 == 0) {
- a.f0 = getInitValue
- return
- }
- //if v is the current min value, we have to iterate the map to
find the 2nd smallest
- // value to replace v as the min value
- if (v == a.f0) {
- val iterator = a.f2.keySet().iterator()
- var key = iterator.next()
- a.f0 = key
- while (iterator.hasNext()) {
- key = iterator.next()
- if (ord.compare(a.f0, key) > 0) {
- a.f0 = key
- }
+ var count = a.f2.get(v)
+ count -= 1L
+ if (count == 0) {
+ //remove the key v from the map if the number of appearance of the
value v is 0
+ a.f2.remove(v)
+ //if the total count is 0, we could just simply set the f0(min) to
the initial value
+ if (a.f1 == 0) {
--- End diff --
remove `a.f1` from accumulator?
> Add retract method into the aggregateFunction
> ---------------------------------------------
>
> Key: FLINK-5956
> URL: https://issues.apache.org/jira/browse/FLINK-5956
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
> Retraction method is help for processing updated message. It will also very
> helpful for window Aggregation. This PR will first add retraction methods
> into the aggregateFunctions, such that on-going over window Aggregation can
> get benefit from it.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)