Github user liurenjie1024 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5688#discussion_r190859490
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
---
@@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K,
I, O](queryConfig: Stream
protected def registerProcessingCleanupTimer(
ctx: KeyedProcessFunction[K, I, O]#Context,
currentTime: Long): Unit = {
- if (stateCleaningEnabled) {
+ registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
+ }
+ protected def registerEventCleanupTimer(
--- End diff --
The reason why I put it in the same PR is that I don't want it to block
this PR, but I also agree that we should move it to a separate one.
---