Myasuka commented on code in PR #20484:
URL: https://github.com/apache/flink/pull/20484#discussion_r939944377
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java:
##########
@@ -149,29 +151,58 @@ public static ChangelogStateBackendHandle
getChangelogStateBackendHandle(
private static KeyedStateHandle castToAbsolutePath(
KeyedStateHandle originKeyedStateHandle) {
// For KeyedStateHandle, only KeyGroupsStateHandle and
IncrementalKeyedStateHandle
- // contain streamStateHandle, and the checkpointedStateScope of
- // IncrementalKeyedStateHandle
- // is shared, no need to
- // cast. So, only KeyGroupsStateHandle need to cast.
- if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
- || originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
+ // contain streamStateHandle, and both of them need to be cast
+ // as they all have state handles of private checkpoint scope.
+ if (originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
return originKeyedStateHandle;
- } else {
+ }
+ if (originKeyedStateHandle instanceof KeyGroupsStateHandle) {
StreamStateHandle streamStateHandle =
((KeyGroupsStateHandle)
originKeyedStateHandle).getDelegateStateHandle();
if (streamStateHandle instanceof FileStateHandle) {
- FileStateHandle fileStateHandle =
- new FileStateHandle(
- ((FileStateHandle)
streamStateHandle).getFilePath(),
- streamStateHandle.getStateSize());
+ StreamStateHandle fileStateHandle =
restoreFileStateHandle(streamStateHandle);
return KeyGroupsStateHandle.restore(
((KeyGroupsStateHandle)
originKeyedStateHandle).getGroupRangeOffsets(),
fileStateHandle,
originKeyedStateHandle.getStateHandleId());
}
- return originKeyedStateHandle;
}
+ if (originKeyedStateHandle instanceof
IncrementalRemoteKeyedStateHandle) {
+ IncrementalRemoteKeyedStateHandle
incrementalRemoteKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle)
originKeyedStateHandle;
+
+ StreamStateHandle castMetaStateHandle =
+ restoreFileStateHandle(
+
incrementalRemoteKeyedStateHandle.getMetaStateHandle());
+ Map<StateHandleID, StreamStateHandle> castPrivateStates =
+
incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e ->
restoreFileStateHandle(e.getValue())));
+
+ return IncrementalRemoteKeyedStateHandle.restore(
+
incrementalRemoteKeyedStateHandle.getBackendIdentifier(),
+ incrementalRemoteKeyedStateHandle.getKeyGroupRange(),
+ incrementalRemoteKeyedStateHandle.getCheckpointId(),
+
incrementalRemoteKeyedStateHandle.getSharedStateHandles(),
+ castPrivateStates,
+ castMetaStateHandle,
Review Comment:
I think this is reasonable for the current implementation, and we can modify
this PR. However, I think we might need another PR to refactor the generated
`IncrementalRemoteKeyedStateHandle` of rocksdb native savepoint to let the sst
files not stay in the `shared state`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]