GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6228

    [FLINK-9491] Implement timer data structure based on RocksDB

    ## What is the purpose of the change
    
    This PR is another step towards integrating the timer state with the keyed 
state backends.
    
    First, the PR generalizes the data structure `InternalTimerHeap` to 
`InternalPriorityQueue` so that the functionality of a heap-set-organized state 
is decoupled from storing timers. The main reason for this is that 
state/backend related code lives in flink-runtime and timers are a concept from 
flink-streaming.
    
    Second, the PR also introduced an implementation of `InternalPriorityQueue` 
with set semantics (i.e. the data structure we require to manage timers) that 
is based on RocksDB. State in RocksDB is always partitioned into key-groups, so 
the general idea is to organize the implementation as a heap-of-heaps, where 
each sub-heap represents elements from exactly one key-group, that merges by 
priority over the key-group boundaries. The implementation reuses the in-memory 
implementation of `InternalPriorityQueue` (without set-properties) as the 
super-heap that holds the sub-heaps. Further more each sub-heap is an instance 
of `CachingInternalPriorityQueueSet`, consisting of a "fast", "small" cache 
(`OrderedSetCache`) and a "slow", "unbounded" store (`OrderedSetStore`), 
currently applying simple write-through synchronization between cache and 
store. In the current implementation, the cache is based on a an AVL-Tree and 
restricted in capacity. The store is backed by a RocksDB column family. 
 We utilize caching to reduced read-accesses to RocksDB.
    
    Please note that the RocksDB implementation is currently not yet integrated 
with the timer service or the backend. This will happen in the next steps.
    
    ## Brief change log
    
    - Refactored `InternalTimerHeap` to decouple it from timers, moved the data 
structures from flink-streaming to flink-runtime (-> `InternalPriorityQueue`).
    - Split the data-structure into a hierarchy, a heap without set-semantics 
(`HeapPriorityQueue`) and a heap extended with set-semantics 
(`HeapPriorityQueueSet`).
    - Introduced an implementation of RocksDB-based `InternalPriorityQueue` 
with set-semantics. Starting point is `KeyGroupPartitionedPriorityQueue`. This 
class uses a `HeapPriorityQueue` of `CachingInternalPriorityQueueSet` elements 
that each contains elements for exactly one key-group (heap-of-heaps). For 
RocksDB, we configure each `CachingInternalPriorityQueueSet` to use a 
`TreeOrderedSetCache` and a `RocksDBOrderedStore`.
    
    
    ## Verifying this change
    
    I added dedicated tests for all data structures.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes, fastutil)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink heapAbstractionsRocks

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6228.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6228
    
----
commit b3261a15bdf15207f50e2832a048fb3c84b8f642
Author: Stefan Richter <s.richter@...>
Date:   2018-06-19T08:01:30Z

    Introduce MAX_ARRAY_SIZE as general constant

commit dd64cbb7eb15317a4dc8f0626c50dffd58e6b5f9
Author: Stefan Richter <s.richter@...>
Date:   2018-06-18T12:38:01Z

    Generalization of timer queue to a queue(set) that is no longer coupled to 
timers and implementation for RocksDB

----


---

Reply via email to