makeyang created FLINK-9182:
-------------------------------

             Summary: async checkpoints for timer service
                 Key: FLINK-9182
                 URL: https://issues.apache.org/jira/browse/FLINK-9182
             Project: Flink
          Issue Type: Improvement
          Components: State Backends, Checkpointing
    Affects Versions: 1.4.2, 1.5.0
            Reporter: makeyang
             Fix For: 1.4.3, 1.5.1


# problem description:
 ## with the increase in the number of  'InternalTimer' object the checkpoint 
more and more slowly
 # improvement desgin
 ## maintain a stateTableVersion, which is exactly the same thing as 
CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
readwrite lock, which is used to protect snapshotVersions and stateTableVersion
 ## for each InternalTimer, add 2 more properties: create version and delete 
version beside 3 existing properties: timestamp, key and namespace. each time a 
Timer is registered in timerservice, it is created with stateTableVersion as 
its create version while delete version is -1. each time when timer is deleted 
in timerservice, it is marked delete for giving it a delete verison equals to 
stateTableVersion without physically delete it from timerservice.
 ## each time when try to snapshot timers, InternalTimeServiceManager increase 
its stateTableVersion and add this stateTableVersion in snapshotVersions. these 
2 operators are protected by write lock of InternalTimeServiceManager. that 
current stateTableVersion take as snapshot version of this snapshot
 ## shallow copy <String,HeapInternalTimerService> tuples
 ## then use a another thread asynchronous snapshot whole things: 
keyserialized, namespaceserializer and timers. for timers which is not 
deleted(delete version is -1) and create version less than snapshot version, 
serialized it. for timers whose delete version is not -1 and is bigger than or 
equals snapshot version, serialized it. otherwise, it will not be serialized by 
this snapshot.
 ## when everything is serialized, remove snapshot version in snapshotVersions, 
which is still in another thread and this action is guarded by write lock.
 ## last thing: timer physical deletion. 2 places to physically delete timers: 
each time when timer is deleted in timerservice, it is marked delete for giving 
it a delete verison equals to stateTableVersion without physically delete it 
from timerservice. after this, check if snapshotVersions size is 0 (which means 
there is no running snapshot) and if true, delete timer .the other place to 
delete is in snapshot timer's iterat: when timer's delete version is less than 
min value of snapshotVersions, which means the timer is deleted and no running 
snapshot should keep it.
 ## some more additions: processingTimeTimers and eventTimeTimers for each 
group used to be hashset and now it is changed to concurrenthashmap with 
key+namesapce+timestamp as its hash key.
 # related mail list thread
 ## 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
 # github pull request
 ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to