[
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544091#comment-16544091
]
ASF GitHub Bot commented on FLINK-9489:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/6333
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of
raw keyed state
## What is the purpose of the change
This PR integrates priority queue state (timers) with the snapshotting of
Flink's state backend ans also already includes backwards compatibility
(FLINK-9490). Core idea is to have a common abstraction for how state is
registered in the state backend and how snapshots operator on such state
(`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new
state integrates more or less seemless with existing snapshot logic. The
notable exception is a current lack of integration of RocksDB state backend
with heap-based priority queue state. This can currently still use the old
snapshot code without causing any regression using a temporary path (see
`AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink
supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks
queue (full and incremental), rocks kv / heap queue (only full) and still uses
synchronous snapshots for rocks kv / heap queue (only incremental).
This work was created in a bit of a rush to make it into the 1.6 release
and still has some known rough edges that we could fix up in the test phase.
Here is a list of some things that already come to my mind:
- Integrate heap-based timers with incremental RocksDB snapshots, then kick
out some code.
- Check proper integration with serializer upgrade story (!!)
- After that, we can also remove the key-partitioning in the set structure
from `HeapPriorityQueueSet`.
- Improve integration of the batch wrapper.
- Improve general state registration logic in the backends, there is
potential to remove duplicated code, and generally still improve the
integration of the queue state a bit.
- Improve performance of RocksDB based timers, e.g. byte[] based cache,
seek sharp to the next potential timer instead of seeking to the key-group
start, bulkPoll.
- Improve some class/interface/method names
- Add tests, e.g. bulkPoll etc.
## Verifying this change
This change is already covered by existing tests, but I would add some more
eventually. You can activate RocksDB based timers by using the RocksDB backend
and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (yes)
- The runtime per-record code paths (performance sensitive): (yes)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (JavaDocs only for now)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6333.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6333
----
commit 1bb8f70700deacc49a4d4ac7900425c10272959d
Author: Stefan Richter <s.richter@...>
Date: 2018-06-13T09:56:16Z
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of
raw keyed state
commit fc20df8268decab6d9890d617787a4084284b2f0
Author: Stefan Richter <s.richter@...>
Date: 2018-07-13T23:19:30Z
Optimization for relaxed bulk polls
commit 4db1bea90fd6881555172fe3d22ee928e97894a7
Author: Stefan Richter <s.richter@...>
Date: 2018-07-14T06:34:16Z
Renaming of some classes/interfaces
----
> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---------------------------------------------------------------------------
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e.,
> stored inside the managed keyed state. This means that we have to connect our
> preparation for asynchronous checkpoints with the backend, so that the timers
> are written as part of the state for each key-group. This means that we will
> also free up the raw keyed state an might expose it to user functions in the
> future.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)