[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521498#comment-16521498
 ] 

ASF GitHub Bot commented on FLINK-9514:
---------------------------------------

Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    I'm still a bit worried about the time-align problem on recovery(because 
I've met serval case that would become disaster on production if we don't do 
the time-align on recovery. (One instance: we used the DFS to store the 
checkpoint data, and the DFS went into safe-mode because of some problems, we 
took several hours to notice that and also took some times to address the 
issue. After addressing DFS's issue, user's jobs were resumed and begin to run 
correctly. In this case, if we don't do the time-align on recovery, then user's 
state maybe already totally expired(when TTL <= the `system down time`)).
    
    I had a second thought on this problem, and I think maybe we could do that 
without a full scanning of the records, the approach is outlined below.
    
    - 1. We need to remember the timestamp when performing the checkpoint, 
let's say it `checkpoint_ts`.
    - 2. We also need to remember the timestamp when recovering from the 
checkpoint, let's say it `recovery_ts`.
    - 3. For each record, we remember it's last update timestamp, let's say it 
`update_ts`.
    - 5. And the current time stamp is `current_ts`.
    - 4. Then we could use the follow condition `checkpoint_ts - update_ts + 
current_s - recovery_ts >= TTL` to check whether the record is expired. If it's 
true then record is expired, otherwise the record is still alive.
    
    What do you think? @azagrebin ,  and @StefanRRichter would be really nice 
to learn your opinion about this problem.


> Create wrapper with TTL logic for value state
> ---------------------------------------------
>
>                 Key: FLINK-9514
>                 URL: https://issues.apache.org/jira/browse/FLINK-9514
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState<V> implements ValueState<V> {
>   ValueState<TtlValue<V>> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
>     TtlValue<V> valueWithTtl = underlyingState.get();
>     // ttl logic here (e.g. update timestamp)
>     return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to