wuchong commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442216042
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeBoundedPrecedingFunction.java
##########
@@ -158,6 +158,21 @@ public void processElement(
// register event time timer
ctx.timerService().registerEventTimeTimer(triggeringTs);
}
+ registerCleanupTimer(ctx, triggeringTs);
+ }
+ }
+
+ private void registerCleanupTimer(
+ KeyedProcessFunction<K, RowData, RowData>.Context ctx,
+ long timestamp) throws Exception {
+ // calculate safe timestamp to cleanup states
+ long cleanupTimestamp = timestamp + precedingOffset + 1;
+ // update timestamp and register timer if needed
+ Long curCleanupTimestamp = cleanupTsState.value();
+ if (curCleanupTimestamp == null || curCleanupTimestamp <
cleanupTimestamp) {
+ // we don't delete existing timer since it may delete
timer for data processing
Review comment:
This may cause some performance problem if we register a timer for each
record, because each timer is an entry in this state. A better solution might
be to use `AbstractStreamOperator` provides `InternalTimerService` which can
register timer by namespace. We can separate the namespace between cleanup and
data processing.
Besides, it would also be better if we can make the cleanup timestamp in a
range instead of a point, e.g. if the current cleanup timer is in `(timestamp +
precedingOffset, precedingOffset + precedingOffset * 1.5)` (similar to
`CleanupState#registerProcessingCleanupTimer`) , then we don't need to register
a new one. This can avoid to remove/register for each record and be friendly to
statebackend.
This might be a big refactoring. Thus I'm fine to add TODO comment here and
create a following issue to do that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]