[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476884#comment-16476884
 ] 

ASF GitHub Bot commented on FLINK-9182:
---------------------------------------

GitHub user makeyang opened a pull request:

    https://github.com/apache/flink/pull/6019

    [FLINK-9182]async checkpoints for timer service

    ## What is the purpose of the change
    
    it is for async checkpoints for timer service
    the whole idea is based on discussion in previous PR for FLINK-9182 in this 
link:https://github.com/apache/flink/pull/5908
    
    ## Brief change log
    
    in sync part flat copy of the internal array of the priority queue
    in async part build key group and write timer key group
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/makeyang/flink FLINK-9182-version2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6019
    
----
commit 82799922203bd6cb959c11336f71aee4def431d7
Author: makeyang <makeyang@...>
Date:   2018-05-16T05:44:16Z

    [FLINK-9182]async checkpoints for timer service
    the whole idea is based on discussion on github: 
https://github.com/apache/flink/pull/5908
    the idea is propesed by StefanRRichter as below:
    "Second, I would probably suggest a simpler model for the async snapshots. 
You dropped the idea of making flat copies, but I wonder if this was premature. 
I can see that calling set.toArray(...) per keygroup could (maybe) turn out a 
bit slow because it has to potentially iterate and flatten linked entries. 
However, with async snapshots, we could get rid of the key-group partitioning 
of sets, and instead do a flat copy of the internal array of the priority 
queue. This would translate to just a single memcopy call internally, which is 
very efficient. In the async part, we can still partition the timers by 
key-group in a similar way as the copy-on-write state table does. This would 
avoid slowing down the event processing path (in fact improving it be unifying 
the sets) and also keep the approach very straight forward and less invasive."

----


> async checkpoints for timer service
> -----------------------------------
>
>                 Key: FLINK-9182
>                 URL: https://issues.apache.org/jira/browse/FLINK-9182
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: makeyang
>            Assignee: makeyang
>            Priority: Minor
>             Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy <String,HeapInternalTimerService> tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to