Myasuka commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r657610540
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -369,7 +509,73 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
// Factory function interface
private interface StateFactory {
<K, N, SV, S extends State, IS extends S> IS create(
- InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N>
changeLogger)
+ InternalKvState<K, N, SV> kvState,
+ KvStateChangeLogger<SV, N> changeLogger,
+ InternalKeyContext<K> keyContext)
throws Exception;
}
+
+ /**
+ * @param name state name
+ * @param type state type (the only supported type currently are: {@link
+ * BackendStateType#KEY_VALUE key value}, {@link
BackendStateType#PRIORITY_QUEUE priority
+ * queue})
+ * @return an existing state, i.e. the one that was already created
+ * @throws NoSuchElementException if the state wasn't created
+ * @throws UnsupportedOperationException if state type is not supported
+ */
+ public ChangelogState getExistingState(String name, BackendStateType type)
+ throws NoSuchElementException, UnsupportedOperationException {
+ ChangelogState state;
+ switch (type) {
+ case KEY_VALUE:
+ state = (ChangelogState) keyValueStatesByName.get(name);
Review comment:
I think we should not call `changelogBackend.getOrCreateKeyedState()` in
step2 to put states within delegated backend. And I think delegating state
descritptor which includes serializer and function could help here. We would
not let delegated backend to call `getOrCreateKeyedState` during restore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]