[
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874300#comment-15874300
]
ASF GitHub Bot commented on FLINK-5544:
---------------------------------------
GitHub user shixiaogang opened a pull request:
https://github.com/apache/flink/pull/3359
[FLINK-5544][streaming] Add InternalTimerService implemented in RocksDB
- Refactor the methods defined in `InternalTimerService`. Some common
implementation in `HeapInternalTimerService` now is moved in
`InternalTimerService`.
- Implement `RocksDBInternalTimerService` which stores timers in RocksDB
and sorts them with an in-momory heap.
- Implement `InternalTimerServiceTestBase` to verify the implementation of
`InternalTimerService`.
- Update `AbstractStreamOperator` to allow the usage of customized
`InternalTimerService`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/alibaba/flink flink-5544
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3359.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3359
----
commit 341fd97c47336d4f87cea997e134af68f8ef5265
Author: xiaogang.sxg <[email protected]>
Date: 2017-02-20T09:55:40Z
Add InternalTimerService implemented in RocksDB
----
> Implement Internal Timer Service in RocksDB
> -------------------------------------------
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is
> HeapInternalTimerService which stores all timers in memory. In the cases
> where the number of keys is very large, the timer service will cost too much
> memory. A implementation which stores timers in RocksDB seems good to deal
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because
> the timers are accessed in different ways. When timers are triggered, we need
> to access timers in the order of timestamp. But when performing checkpoints,
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of
> merge sorting. We can store timers in RocksDB with the format
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put
> together and are sorted.
> Then we can deploy an in-memory heap which keeps the first timer of each key
> group to get the next timer to trigger. When a key group's first timer is
> updated, we can efficiently update the heap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)