protos37 commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442237237
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
##########
@@ -158,6 +158,21 @@ public void processElement(
// register event time timer
ctx.timerService().registerEventTimeTimer(triggeringTs);
}
+ registerCleanupTimer(ctx, triggeringTs);
+ }
+ }
+
+ private void registerCleanupTimer(
+ KeyedProcessFunction<K, RowData, RowData>.Context ctx,
+ long timestamp) throws Exception {
+ // calculate safe timestamp to cleanup states
+ long cleanupTimestamp = timestamp + precedingOffset + 1;
+ // update timestamp and register timer if needed
+ Long curCleanupTimestamp = cleanupTsState.value();
+ if (curCleanupTimestamp == null || curCleanupTimestamp <
cleanupTimestamp) {
+ // we don't delete existing timer since it may delete
timer for data processing
Review comment:
About having cleanup timestamp as range, if I understood correctly, it
seems to be about tradeoff between immediate state reduction and timer related
overhead. While we don't have specific criteria like `maxRetentionTime`, how
can we choose the appropriate generosity for cleanup? Is it okay to go for 1.5
times as you mentioned?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]