protos37 commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442061446
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -128,19 +138,42 @@ public void processElement(
rowList = new ArrayList<RowData>();
// register timer to process event once the current
millisecond passed
ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+ registerRetractTimer(ctx, currentTime);
}
rowList.add(input);
inputState.put(currentTime, rowList);
}
+ private void registerRetractTimer(KeyedProcessFunction<K, RowData,
RowData>.Context ctx, long timestamp) throws Exception {
+ // calculate safe timestamp to retract all records
+ long retractTimestamp = timestamp + precedingTimeBoundary + 1;
+ // update timestamp and register timer if needed
+ Long curRetractTimestamp = retractTsState.value();
+ if (curRetractTimestamp == null || curRetractTimestamp <
retractTimestamp) {
+ // we don't delete existing timer since it may delete
timer for data processing
+
ctx.timerService().registerProcessingTimeTimer(retractTimestamp);
+ retractTsState.update(retractTimestamp);
+ }
+ }
+
@Override
public void onTimer(
long timestamp,
KeyedProcessFunction<K, RowData,
RowData>.OnTimerContext ctx,
Collector<RowData> out) throws Exception {
if (needToCleanupState(timestamp)) {
// clean up and return
- cleanupState(inputState, accState);
+ cleanupState(inputState, accState, retractTsState);
+ function.cleanup();
+ return;
+ }
+
+ Long retractTimestamp = retractTsState.value();
+ // if retractTsState has not been updated then it is safe to
retract all records
+ if (retractTimestamp != null && retractTimestamp <= timestamp) {
+ inputState.clear();
+ accState.clear();
+ retractTsState.clear();
Review comment:
I choose not to use `cleanupState()` because it is provided by
`KeyedProcessFunctionWithCleanupState` which is for cleanup by retention and
all the other methods are expected to work only if cleanup is enabled. This
change is a distinct feature from cleanup and I think it is better not to
depend on codepath for cleanup.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]