[ 
https://issues.apache.org/jira/browse/FLINK-6491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005913#comment-16005913
 ] 

ASF GitHub Bot commented on FLINK-6491:
---------------------------------------

Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3863#discussion_r115907975
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
 ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.{Long => JLong}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.state.State
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.{StreamQueryConfig, Types}
    +
    +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: 
StreamQueryConfig)
    +  extends ProcessFunction[IN, OUT]{
    +
    +  protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
    +  protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
    +  protected val stateCleaningEnabled = minRetentionTime > 1 && 
maxRetentionTime > 1
    +  // interval in which clean-up timers are registered
    +  protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
    +
    +  // holds the latest registered cleanup timer
    +  private var cleanupTimeState: ValueState[JLong] = _
    +
    +  protected def initCleanupTimeState(stateName: String) {
    +    if (stateCleaningEnabled) {
    +      val inputCntDescriptor: ValueStateDescriptor[JLong] =
    +        new ValueStateDescriptor[JLong](stateName, Types.LONG)
    +      cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
    +    }
    +  }
    +
    +  protected def registerProcessingCleanupTimer(
    +    ctx: ProcessFunction[IN, OUT]#Context,
    +    currentTime: Long): Unit = {
    +    if (stateCleaningEnabled) {
    +
    +      val earliestCleanup = currentTime + minRetentionTime
    +
    +      // last registered timer
    +      val lastCleanupTime = cleanupTimeState.value()
    +
    +      if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + 
cleanupTimerInterval) {
    +        // we need to register a new timer
    +        val cleanupTime = earliestCleanup + cleanupTimerInterval
    +        // register timer and remember clean-up time
    +        ctx.timerService().registerProcessingTimeTimer(cleanupTime)
    +        cleanupTimeState.update(cleanupTime)
    +      }
    +    }
    +  }
    +  protected def registerEventCleanupTimer(
    --- End diff --
    
    The reason of I add this method is In A `row-time` OVER 
(TimeDomain.EVENT_TIME), we always get `0` when we call 
`ctx.timerService.currentProcessingTime`.


> Add QueryConfig to specify state retention time for streaming queries
> ---------------------------------------------------------------------
>
>                 Key: FLINK-6491
>                 URL: https://issues.apache.org/jira/browse/FLINK-6491
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Fabian Hueske
>            Assignee: sunjincheng
>            Priority: Critical
>
> By now we have a couple of streaming operators (group-windows, over-windows, 
> non-windowed aggregations) that require operator state. Since state is not 
> automatically cleaned-up by Flink, we need to add a mechanism to configure a 
> state retention time. 
> If configured, a query will retain state for a specified period of state 
> inactivity. If state is not accessed within this period of time, it will be 
> cleared. I propose to add two parameters for this, a min and a max retention 
> time. The min retention time specifies the earliest time and the max 
> retention time the latest time when state is cleared. The reasoning for 
> having two parameters is that we can avoid to register many timers if we have 
> more freedom when to discard state.
> This issue also introduces a QueryConfig object which can be passed to a 
> streaming query, when it is emitted to a TableSink or converted to a 
> DataStream (append or retraction).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to