[
https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004900#comment-16004900
]
ASF GitHub Bot commented on FLINK-6491:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3863#discussion_r115775350
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.state.State
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+
+abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig:
StreamQueryConfig)
+ extends ProcessFunction[IN, OUT]{
+
+ protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
+ protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
+ protected val stateCleaningEnabled = minRetentionTime > 1 &&
maxRetentionTime > 1
+ // interval in which clean-up timers are registered
+ protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
+
+ // holds the latest registered cleanup timer
+ private var cleanupTimeState: ValueState[JLong] = _
+
+ protected def initCleanupTimeState(stateName: String) {
+ if (stateCleaningEnabled) {
+ val inputCntDescriptor: ValueStateDescriptor[JLong] =
+ new ValueStateDescriptor[JLong](stateName, Types.LONG)
+ cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+ }
+ }
+
+ protected def registerProcessingCleanupTimer(
+ ctx: ProcessFunction[IN, OUT]#Context,
+ currentTime: Long): Unit = {
+ if (stateCleaningEnabled) {
+
+ val earliestCleanup = currentTime + minRetentionTime
+
+ // last registered timer
+ val lastCleanupTime = cleanupTimeState.value()
+
+ if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime +
cleanupTimerInterval) {
+ // we need to register a new timer
+ val cleanupTime = earliestCleanup + cleanupTimerInterval
+ // register timer and remember clean-up time
+ ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+ cleanupTimeState.update(cleanupTime)
+ }
+ }
+ }
+ protected def registerEventCleanupTimer(
--- End diff --
I'm not sure if we should define state cleanup on event time.
If we have an event-time window followed by a non-windowed aggregate, we
would use event-time and processing time for cleaning up. Event-time is also
harder to reason about than processing time.
The RocksDB TTL is probably also defined in terms of wall clock time.
So I would propose to drop this method and always use processing time for
cleanup timers.
What do you think @sunjincheng121?
> Add QueryConfig to specify state retention time for streaming queries
> ---------------------------------------------------------------------
>
> Key: FLINK-6491
> URL: https://issues.apache.org/jira/browse/FLINK-6491
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: Fabian Hueske
> Assignee: sunjincheng
> Priority: Critical
>
> By now we have a couple of streaming operators (group-windows, over-windows,
> non-windowed aggregations) that require operator state. Since state is not
> automatically cleaned-up by Flink, we need to add a mechanism to configure a
> state retention time.
> If configured, a query will retain state for a specified period of state
> inactivity. If state is not accessed within this period of time, it will be
> cleared. I propose to add two parameters for this, a min and a max retention
> time. The min retention time specifies the earliest time and the max
> retention time the latest time when state is cleared. The reasoning for
> having two parameters is that we can avoid to register many timers if we have
> more freedom when to discard state.
> This issue also introduces a QueryConfig object which can be passed to a
> streaming query, when it is emitted to a TableSink or converted to a
> DataStream (append or retraction).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)