wuchong commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442278052
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
##########
@@ -158,6 +158,21 @@ public void processElement(
// register event time timer
ctx.timerService().registerEventTimeTimer(triggeringTs);
}
+ registerCleanupTimer(ctx, triggeringTs);
+ }
+ }
+
+ private void registerCleanupTimer(
+ KeyedProcessFunction<K, RowData, RowData>.Context ctx,
+ long timestamp) throws Exception {
+ // calculate safe timestamp to cleanup states
+ long cleanupTimestamp = timestamp + precedingOffset + 1;
+ // update timestamp and register timer if needed
+ Long curCleanupTimestamp = cleanupTsState.value();
+ if (curCleanupTimestamp == null || curCleanupTimestamp <
cleanupTimestamp) {
+ // we don't delete existing timer since it may delete
timer for data processing
Review comment:
Yes. This is a tradeoff to avoid too many timers. But the 1.5 times is
up to discuss.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]