[
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Hueske reassigned FLINK-9182:
------------------------------------
Assignee: makeyang
> async checkpoints for timer service
> -----------------------------------
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0, 1.4.2
> Reporter: makeyang
> Assignee: makeyang
> Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
> ## with the increase in the number of 'InternalTimer' object the checkpoint
> more and more slowly
> # improvement desgin
> ## maintain a stateTableVersion, which is exactly the same thing as
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
> readwrite lock, which is used to protect snapshotVersions and
> stateTableVersion
> ## for each InternalTimer, add 2 more properties: create version and delete
> version beside 3 existing properties: timestamp, key and namespace. each time
> a Timer is registered in timerservice, it is created with stateTableVersion
> as its create version while delete version is -1. each time when timer is
> deleted in timerservice, it is marked delete for giving it a delete verison
> equals to stateTableVersion without physically delete it from timerservice.
> ## each time when try to snapshot timers, InternalTimeServiceManager
> increase its stateTableVersion and add this stateTableVersion in
> snapshotVersions. these 2 operators are protected by write lock of
> InternalTimeServiceManager. that current stateTableVersion take as snapshot
> version of this snapshot
> ## shallow copy <String,HeapInternalTimerService> tuples
> ## then use a another thread asynchronous snapshot whole things:
> keyserialized, namespaceserializer and timers. for timers which is not
> deleted(delete version is -1) and create version less than snapshot version,
> serialized it. for timers whose delete version is not -1 and is bigger than
> or equals snapshot version, serialized it. otherwise, it will not be
> serialized by this snapshot.
> ## when everything is serialized, remove snapshot version in
> snapshotVersions, which is still in another thread and this action is guarded
> by write lock.
> ## last thing: timer physical deletion. 2 places to physically delete
> timers: each time when timer is deleted in timerservice, it is marked delete
> for giving it a delete verison equals to stateTableVersion without physically
> delete it from timerservice. after this, check if snapshotVersions size is 0
> (which means there is no running snapshot) and if true, delete timer .the
> other place to delete is in snapshot timer's iterat: when timer's delete
> version is less than min value of snapshotVersions, which means the timer is
> deleted and no running snapshot should keep it.
> ## some more additions: processingTimeTimers and eventTimeTimers for each
> group used to be hashset and now it is changed to concurrenthashmap with
> key+namesapce+timestamp as its hash key.
> # related mail list thread
> ##
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
> # github pull request
> ## //coming soon
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)