protos37 commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442061446



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -128,19 +138,42 @@ public void processElement(
                        rowList = new ArrayList<RowData>();
                        // register timer to process event once the current 
millisecond passed
                        
ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+                       registerRetractTimer(ctx, currentTime);
                }
                rowList.add(input);
                inputState.put(currentTime, rowList);
        }
 
+       private void registerRetractTimer(KeyedProcessFunction<K, RowData, 
RowData>.Context ctx, long timestamp) throws Exception {
+               // calculate safe timestamp to retract all records
+               long retractTimestamp = timestamp + precedingTimeBoundary + 1;
+               // update timestamp and register timer if needed
+               Long curRetractTimestamp = retractTsState.value();
+               if (curRetractTimestamp == null || curRetractTimestamp < 
retractTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing
+                       
ctx.timerService().registerProcessingTimeTimer(retractTimestamp);
+                       retractTsState.update(retractTimestamp);
+               }
+       }
+
        @Override
        public void onTimer(
                        long timestamp,
                        KeyedProcessFunction<K, RowData, 
RowData>.OnTimerContext ctx,
                        Collector<RowData> out) throws Exception {
                if (needToCleanupState(timestamp)) {
                        // clean up and return
-                       cleanupState(inputState, accState);
+                       cleanupState(inputState, accState, retractTsState);
+                       function.cleanup();
+                       return;
+               }
+
+               Long retractTimestamp = retractTsState.value();
+               // if retractTsState has not been updated then it is safe to 
retract all records
+               if (retractTimestamp != null && retractTimestamp <= timestamp) {
+                       inputState.clear();
+                       accState.clear();
+                       retractTsState.clear();

Review comment:
       I choose not to use `cleanupState()` because it is provided by 
`KeyedProcessFunctionWithCleanupState` which is for cleanup by retention and 
all the other methods are expected to work only if cleanup is enabled. This 
change is a distinct feature from cleanup and I think it is better not to 
depend on codepath for cleanup.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to