Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/3641
Oh, I just remembered that the OVER RANGE semantics are slightly different.
All rows that arrive in the same millisecond need to get the same aggregation
value. So we need not only to retract all records which are too old but also
accumulate all records which are received in the same millisecond.
Therefore, we would need to redesign the `ProcessFunction` a bit.
- In `processElement()` we put the new record in the MapState and register
a processing time timer for current time + 1. This will create a call back on
`onTimer()` when current time + 1 is reached.
- When `onTimer()` is called, we process the rows of timestamp - 1, retract
all old values accumulate all new values and emit all rows of timestamp - 1.
The implementation of `onTimer()` can reuse most of what is currently done in
`processElement()`.
Does that make sense @rtudoran?
Do you want to make the change? Otherwise, I can also do it before merging.
Sorry for recognizing this just now :-/
Best, Fabian
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---