libenchao commented on a change in pull request #12680:
URL: https://github.com/apache/flink/pull/12680#discussion_r442008249



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -128,19 +138,42 @@ public void processElement(
                        rowList = new ArrayList<RowData>();
                        // register timer to process event once the current 
millisecond passed
                        
ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+                       registerRetractTimer(ctx, currentTime);
                }
                rowList.add(input);
                inputState.put(currentTime, rowList);
        }
 
+       private void registerRetractTimer(KeyedProcessFunction<K, RowData, 
RowData>.Context ctx, long timestamp) throws Exception {
+               // calculate safe timestamp to retract all records
+               long retractTimestamp = timestamp + precedingTimeBoundary + 1;
+               // update timestamp and register timer if needed
+               Long curRetractTimestamp = retractTsState.value();
+               if (curRetractTimestamp == null || curRetractTimestamp < 
retractTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing
+                       
ctx.timerService().registerProcessingTimeTimer(retractTimestamp);
+                       retractTsState.update(retractTimestamp);
+               }
+       }
+
        @Override
        public void onTimer(
                        long timestamp,
                        KeyedProcessFunction<K, RowData, 
RowData>.OnTimerContext ctx,
                        Collector<RowData> out) throws Exception {
                if (needToCleanupState(timestamp)) {
                        // clean up and return
-                       cleanupState(inputState, accState);
+                       cleanupState(inputState, accState, retractTsState);
+                       function.cleanup();
+                       return;
+               }
+
+               Long retractTimestamp = retractTsState.value();
+               // if retractTsState has not been updated then it is safe to 
retract all records
+               if (retractTimestamp != null && retractTimestamp <= timestamp) {
+                       inputState.clear();
+                       accState.clear();
+                       retractTsState.clear();

Review comment:
       can we use `cleanupState(inputState, accState, retractTsState);` here?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -128,19 +138,42 @@ public void processElement(
                        rowList = new ArrayList<RowData>();
                        // register timer to process event once the current 
millisecond passed
                        
ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+                       registerRetractTimer(ctx, currentTime);
                }
                rowList.add(input);
                inputState.put(currentTime, rowList);
        }
 
+       private void registerRetractTimer(KeyedProcessFunction<K, RowData, 
RowData>.Context ctx, long timestamp) throws Exception {

Review comment:
       seems this line is too long.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunction.java
##########
@@ -107,6 +111,12 @@ public void open(Configuration parameters) throws 
Exception {
                        new ValueStateDescriptor<RowData>("accState", 
accTypeInfo);
                accState = getRuntimeContext().getState(stateDescriptor);
 
+               ValueStateDescriptor<Long> retractTsStateDescriptor = new 
ValueStateDescriptor<Long>(

Review comment:
       very small suggestion, not strong opnion.
   ```suggestion
                ValueStateDescriptor<Long> retractTsStateDescriptor = new 
ValueStateDescriptor<>(
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to