libenchao commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442008249
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -128,19 +138,42 @@ public void processElement(
rowList = new ArrayList<RowData>();
// register timer to process event once the current
millisecond passed
ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+ registerRetractTimer(ctx, currentTime);
}
rowList.add(input);
inputState.put(currentTime, rowList);
}
+ private void registerRetractTimer(KeyedProcessFunction<K, RowData,
RowData>.Context ctx, long timestamp) throws Exception {
+ // calculate safe timestamp to retract all records
+ long retractTimestamp = timestamp + precedingTimeBoundary + 1;
+ // update timestamp and register timer if needed
+ Long curRetractTimestamp = retractTsState.value();
+ if (curRetractTimestamp == null || curRetractTimestamp <
retractTimestamp) {
+ // we don't delete existing timer since it may delete
timer for data processing
+
ctx.timerService().registerProcessingTimeTimer(retractTimestamp);
+ retractTsState.update(retractTimestamp);
+ }
+ }
+
@Override
public void onTimer(
long timestamp,
KeyedProcessFunction<K, RowData,
RowData>.OnTimerContext ctx,
Collector<RowData> out) throws Exception {
if (needToCleanupState(timestamp)) {
// clean up and return
- cleanupState(inputState, accState);
+ cleanupState(inputState, accState, retractTsState);
+ function.cleanup();
+ return;
+ }
+
+ Long retractTimestamp = retractTsState.value();
+ // if retractTsState has not been updated then it is safe to
retract all records
+ if (retractTimestamp != null && retractTimestamp <= timestamp) {
+ inputState.clear();
+ accState.clear();
+ retractTsState.clear();
Review comment:
can we use `cleanupState(inputState, accState, retractTsState);` here?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -128,19 +138,42 @@ public void processElement(
rowList = new ArrayList<RowData>();
// register timer to process event once the current
millisecond passed
ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+ registerRetractTimer(ctx, currentTime);
}
rowList.add(input);
inputState.put(currentTime, rowList);
}
+ private void registerRetractTimer(KeyedProcessFunction<K, RowData,
RowData>.Context ctx, long timestamp) throws Exception {
Review comment:
seems this line is too long.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -107,6 +111,12 @@ public void open(Configuration parameters) throws
Exception {
new ValueStateDescriptor<RowData>("accState",
accTypeInfo);
accState = getRuntimeContext().getState(stateDescriptor);
+ ValueStateDescriptor<Long> retractTsStateDescriptor = new
ValueStateDescriptor<Long>(
Review comment:
very small suggestion, not strong opnion.
```suggestion
ValueStateDescriptor<Long> retractTsStateDescriptor = new
ValueStateDescriptor<>(
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]