[
https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004906#comment-16004906
]
ASF GitHub Bot commented on FLINK-6491:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3863#discussion_r115776739
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.state.State
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig:
StreamQueryConfig)
+ extends ProcessFunction[IN, OUT]{
+
+ protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
+ protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
+ protected val stateCleaningEnabled = minRetentionTime > 1 &&
maxRetentionTime > 1
+ // interval in which clean-up timers are registered
+ protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
+
+ // holds the latest registered cleanup timer
+ private var cleanupTimeState: ValueState[JLong] = _
+
+ protected def initCleanupTimeState(stateName: String) {
+ if (stateCleaningEnabled) {
+ val inputCntDescriptor: ValueStateDescriptor[JLong] =
+ new ValueStateDescriptor[JLong](stateName, Types.LONG)
+ cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+ }
+ }
+
+ protected def registerProcessingCleanupTimer(
+ ctx: ProcessFunction[IN, OUT]#Context,
+ currentTime: Long): Unit = {
+ if (stateCleaningEnabled) {
+
+ val earliestCleanup = currentTime + minRetentionTime
+
+ // last registered timer
+ val lastCleanupTime = cleanupTimeState.value()
+
+ if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime +
cleanupTimerInterval) {
--- End diff --
This condition is wrong and would lead to state being discarded before the
min retention time. It should be `(lastCleanupTime == null || earliestCleanup >
lastCleanupTime)`.
With this condition, the timer logic works as follows:
- If the earliest time at which the state may be cleaned
(`earliestCleanup`) is later than the last registered timer
(`lastCleanupTime`), we need to register a new (later) timer.
- The new timer is registered for current time + max retention interval,
such that all records that arrive from now on until `currentTime +
cleanupTimerInterval` can reuse this timer.
Please double check the logic @sunjincheng121.
> Add QueryConfig to specify state retention time for streaming queries
> ---------------------------------------------------------------------
>
> Key: FLINK-6491
> URL: https://issues.apache.org/jira/browse/FLINK-6491
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: Fabian Hueske
> Assignee: sunjincheng
> Priority: Critical
>
> By now we have a couple of streaming operators (group-windows, over-windows,
> non-windowed aggregations) that require operator state. Since state is not
> automatically cleaned-up by Flink, we need to add a mechanism to configure a
> state retention time.
> If configured, a query will retain state for a specified period of state
> inactivity. If state is not accessed within this period of time, it will be
> cleared. I propose to add two parameters for this, a min and a max retention
> time. The min retention time specifies the earliest time and the max
> retention time the latest time when state is cleared. The reasoning for
> having two parameters is that we can avoid to register many timers if we have
> more freedom when to discard state.
> This issue also introduces a QueryConfig object which can be passed to a
> streaming query, when it is emitted to a TableSink or converted to a
> DataStream (append or retraction).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)