fhueske commented on code in PR #27487:
URL: https://github.com/apache/flink/pull/27487#discussion_r2741314218


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractNonTimeUnboundedPrecedingOver.java:
##########
@@ -288,6 +284,19 @@ public void processElement(
         }
 
         // Reset acc state since we can have out of order inserts in the 
ordered list
+        resetAndCleanupAggFuncs();
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<RowData> 
out)
+            throws Exception {
+        if (stateCleaningEnabled) {
+            cleanupState(idState, valueMapState, accMapState, sortedListState);
+            resetAndCleanupAggFuncs();

Review Comment:
   I think we should check that the firing timer is actually a clean up timer, 
even though this operator does not register any other timers. This makes it a 
bit safer and the intent more clear.
   * `KeyedProcessFunctionWithCleanupState.isProcessingTimeTimer()` 
   * `KeyedProcessFunctionWithCleanupState.needToCleanupState()`



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractNonTimeUnboundedPrecedingOver.java:
##########


Review Comment:
   Can this be removed (including import of class and static function)?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractNonTimeUnboundedPrecedingOver.java:
##########
@@ -288,6 +284,19 @@ public void processElement(
         }
 
         // Reset acc state since we can have out of order inserts in the 
ordered list
+        resetAndCleanupAggFuncs();
+    }
+
+    @Override
+    public void onTimer(long timestamp, OnTimerContext ctx, Collector<RowData> 
out)

Review Comment:
   add some comments explaining that we are cleaning up state due to State TTL?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to