[
https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535082#comment-16535082
]
ASF GitHub Bot commented on FLINK-9486:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/6276
[FLINK-9486] Introduce TimerState in keyed state backend
## What is the purpose of the change
This PR integrates `InternalTimerQueue` with keyed state backends (Heap and
RocksDB), so that we can use the RocksDB-based version in the job for the first
time.
We introduce the interface `KeyGroupPartitionedPriorityQueue` as an easy
adapter to existing snapshotting code. This can probably be removed once the
queues are fully integrated with the backend's snapshotting, in a followup PR.
The PR also addresses an issue with the `TreeOrderedCache` that requires a
"proper" `Comparator` (implemented in `TieBreakingPriorityComparator`) and we
introduce `PriorityComparator` to give more emphasize to this difference.
`TieBreakingPriorityComparator` is likely to also go away once we come up with
an improved caching that is not simply based on a tree.
We introduce `PriorityQueueSetFactory` to the keyed state backends, and
this is were the queues are build. The current implementation of RocksDB uses
an additional RocksDB instance until we are fully integrated with backend
snapshotting, because we are otherwise running into trouble with incremental
snapshots.
A configuration parameter is introduced to chose the implementation of
queues for RocksDB, the default is still using the heap variant for now.
Finally, we introduce an additional method for bulk polling in the
`InternalTimerQueue` interface that opens up future optimizations.
## Verifying this change
This change is already covered by existing tests, such as
`AbstractEventTimeWindowCheckpointingITCase`, but you would currently need to
activate it via
`RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (yes, if
activated)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink
integrateSetStateWithBackends
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6276.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6276
----
commit 0d8743e52a658876425b6cef03fef3fef2d09deb
Author: Stefan Richter <s.richter@...>
Date: 2018-07-04T11:43:49Z
Remove read options from RocksDBOrderedSetStore
commit 84b1b36357322cf23d50396cbfa0725db95797ea
Author: Stefan Richter <s.richter@...>
Date: 2018-07-04T11:51:14Z
Introduce (temporary?/visible for testing) KeyGroupPartitionedPriorityQueue
interface to work with the existing snapshotting
commit 35e02705f6740854ae18a92b5a6dfbafd3201a8f
Author: Stefan Richter <s.richter@...>
Date: 2018-07-04T16:07:54Z
Basic integration with backends / make Rocks timers work
commit 1294ac356162430cf9de86980de1d4a0154f33b8
Author: Stefan Richter <s.richter@...>
Date: 2018-07-05T16:46:34Z
Introduce PriorityComparator and tie breaking variant as adapter to
collections that require a comparator.
This is required because the tree set that we use in the cache expects that
Comparators are aligned with Object#equals
commit bfd3a12e77348a79c91656d80a7a67ece9825103
Author: Stefan Richter <s.richter@...>
Date: 2018-07-05T19:35:08Z
Iterator directly from cache if no store-only elements.
commit fbf26f1f2efbe1e2029d09d297808e26e08b87d8
Author: Stefan Richter <s.richter@...>
Date: 2018-07-06T08:22:49Z
Use a dedicated RocksDB instance for priority queue state. We can revert
this once priority queue state is properly integrated with the
snapshotting. Until then, we must isolate the priority queue state in
a separate db or else incremental checkpoints will break.
commit 75cb05ab21e07eaed25e1cac048919f7f313b3f6
Author: Stefan Richter <s.richter@...>
Date: 2018-07-06T13:55:02Z
Configuration part
commit 7a86e268072ec4ad9d9fae2fa8e852b66d4424a8
Author: Stefan Richter <s.richter@...>
Date: 2018-07-06T14:48:53Z
Introduce bulk poll method in queue to open up future optimizations
----
> Introduce TimerState in keyed state backend
> -------------------------------------------
>
> Key: FLINK-9486
> URL: https://issues.apache.org/jira/browse/FLINK-9486
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the
> keyed state backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the
> {{StateTable}} that hold other forms of keyed state, and the implementation
> is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this
> state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an
> intermediate state, and we will later also implement the alternative to store
> the timers inside a column families in RocksDB. However, by taking this step,
> we could also still offer the option to have RocksDB state with heap-based
> timers.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)