[
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544675#comment-16544675
]
ASF GitHub Bot commented on FLINK-9489:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6333#discussion_r202553765
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
---
@@ -342,16 +379,20 @@ private void
restorePartitionedState(Collection<KeyedStateHandle> state) throws
for (StateMetaInfoSnapshot restoredMetaInfo :
restoredMetaInfos) {
restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
- StateTable<K, ?, ?> stateTable =
stateTables.get(restoredMetaInfo.getName());
+ StateSnapshotRestore snapshotRestore =
registeredStates.get(restoredMetaInfo.getName());
//important: only create a new table we
did not already create it previously
- if (null == stateTable) {
+ if (null == snapshotRestore) {
-
RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
- new
RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+ if
(restoredMetaInfo.getBackendStateType() ==
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE) {
+
RegisteredKeyValueStateBackendMetaInfo<?, ?>
registeredKeyedBackendStateMetaInfo =
+ new
RegisteredKeyValueStateBackendMetaInfo<>(restoredMetaInfo);
- stateTable =
snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
-
stateTables.put(restoredMetaInfo.getName(), stateTable);
+ snapshotRestore =
snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
+
registeredStates.put(restoredMetaInfo.getName(), snapshotRestore);
+ } else {
--- End diff --
Maybe check that `(restoredMetaInfo.getBackendStateType() == PRIORITY_QUEUE`
> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---------------------------------------------------------------------------
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e.,
> stored inside the managed keyed state. This means that we have to connect our
> preparation for asynchronous checkpoints with the backend, so that the timers
> are written as part of the state for each key-group. This means that we will
> also free up the raw keyed state an might expose it to user functions in the
> future.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)