Myasuka commented on code in PR #20484:
URL: https://github.com/apache/flink/pull/20484#discussion_r939775768
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java:
##########
@@ -149,29 +151,58 @@ public static ChangelogStateBackendHandle
getChangelogStateBackendHandle(
private static KeyedStateHandle castToAbsolutePath(
KeyedStateHandle originKeyedStateHandle) {
// For KeyedStateHandle, only KeyGroupsStateHandle and
IncrementalKeyedStateHandle
- // contain streamStateHandle, and the checkpointedStateScope of
- // IncrementalKeyedStateHandle
- // is shared, no need to
- // cast. So, only KeyGroupsStateHandle need to cast.
- if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
- || originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
+ // contain streamStateHandle, KeyGroupsStateHandle need to cast.
and private handles
+ // in IncrementalRemoteKeyedStateHandle also need to cast.
+ if (originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
return originKeyedStateHandle;
- } else {
+ }
+ if (originKeyedStateHandle instanceof KeyGroupsStateHandle) {
StreamStateHandle streamStateHandle =
((KeyGroupsStateHandle)
originKeyedStateHandle).getDelegateStateHandle();
if (streamStateHandle instanceof FileStateHandle) {
- FileStateHandle fileStateHandle =
- new FileStateHandle(
- ((FileStateHandle)
streamStateHandle).getFilePath(),
- streamStateHandle.getStateSize());
+ StreamStateHandle fileStateHandle =
restoreFileStateHandle(streamStateHandle);
return KeyGroupsStateHandle.restore(
((KeyGroupsStateHandle)
originKeyedStateHandle).getGroupRangeOffsets(),
fileStateHandle,
originKeyedStateHandle.getStateHandleId());
}
- return originKeyedStateHandle;
}
+ if (originKeyedStateHandle instanceof
IncrementalRemoteKeyedStateHandle) {
+ IncrementalRemoteKeyedStateHandle
incrementalRemoteKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle)
originKeyedStateHandle;
+
+ StreamStateHandle newMetaStateHandle =
Review Comment:
`castMetaStateHandle` looks better.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java:
##########
@@ -149,29 +151,58 @@ public static ChangelogStateBackendHandle
getChangelogStateBackendHandle(
private static KeyedStateHandle castToAbsolutePath(
KeyedStateHandle originKeyedStateHandle) {
// For KeyedStateHandle, only KeyGroupsStateHandle and
IncrementalKeyedStateHandle
- // contain streamStateHandle, and the checkpointedStateScope of
- // IncrementalKeyedStateHandle
- // is shared, no need to
- // cast. So, only KeyGroupsStateHandle need to cast.
- if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
- || originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
+ // contain streamStateHandle, KeyGroupsStateHandle need to cast.
and private handles
+ // in IncrementalRemoteKeyedStateHandle also need to cast.
Review Comment:
```suggestion
// contain streamStateHandle, and both of them need to be cast
// as they all have state handles of private checkpoint scope.
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java:
##########
@@ -149,29 +151,58 @@ public static ChangelogStateBackendHandle
getChangelogStateBackendHandle(
private static KeyedStateHandle castToAbsolutePath(
KeyedStateHandle originKeyedStateHandle) {
// For KeyedStateHandle, only KeyGroupsStateHandle and
IncrementalKeyedStateHandle
- // contain streamStateHandle, and the checkpointedStateScope of
- // IncrementalKeyedStateHandle
- // is shared, no need to
- // cast. So, only KeyGroupsStateHandle need to cast.
- if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
- || originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
+ // contain streamStateHandle, KeyGroupsStateHandle need to cast.
and private handles
+ // in IncrementalRemoteKeyedStateHandle also need to cast.
+ if (originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
return originKeyedStateHandle;
- } else {
+ }
+ if (originKeyedStateHandle instanceof KeyGroupsStateHandle) {
StreamStateHandle streamStateHandle =
((KeyGroupsStateHandle)
originKeyedStateHandle).getDelegateStateHandle();
if (streamStateHandle instanceof FileStateHandle) {
- FileStateHandle fileStateHandle =
- new FileStateHandle(
- ((FileStateHandle)
streamStateHandle).getFilePath(),
- streamStateHandle.getStateSize());
+ StreamStateHandle fileStateHandle =
restoreFileStateHandle(streamStateHandle);
return KeyGroupsStateHandle.restore(
((KeyGroupsStateHandle)
originKeyedStateHandle).getGroupRangeOffsets(),
fileStateHandle,
originKeyedStateHandle.getStateHandleId());
}
- return originKeyedStateHandle;
}
+ if (originKeyedStateHandle instanceof
IncrementalRemoteKeyedStateHandle) {
+ IncrementalRemoteKeyedStateHandle
incrementalRemoteKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle)
originKeyedStateHandle;
+
+ StreamStateHandle newMetaStateHandle =
+ restoreFileStateHandle(
+
incrementalRemoteKeyedStateHandle.getMetaStateHandle());
+ Map<StateHandleID, StreamStateHandle> newPrivateStates =
+
incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e ->
restoreFileStateHandle(e.getValue())));
+
+ return IncrementalRemoteKeyedStateHandle.restore(
+
incrementalRemoteKeyedStateHandle.getBackendIdentifier(),
+ incrementalRemoteKeyedStateHandle.getKeyGroupRange(),
+ incrementalRemoteKeyedStateHandle.getCheckpointId(),
+
incrementalRemoteKeyedStateHandle.getSharedStateHandles(),
+ newPrivateStates,
+ newMetaStateHandle,
+
incrementalRemoteKeyedStateHandle.getCheckpointedSize(),
+ incrementalRemoteKeyedStateHandle.getStateHandleId());
+ }
+ return originKeyedStateHandle;
+ }
+
+ protected static StreamStateHandle restoreFileStateHandle(
Review Comment:
This method should be private scope.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java:
##########
@@ -149,29 +151,58 @@ public static ChangelogStateBackendHandle
getChangelogStateBackendHandle(
private static KeyedStateHandle castToAbsolutePath(
KeyedStateHandle originKeyedStateHandle) {
// For KeyedStateHandle, only KeyGroupsStateHandle and
IncrementalKeyedStateHandle
- // contain streamStateHandle, and the checkpointedStateScope of
- // IncrementalKeyedStateHandle
- // is shared, no need to
- // cast. So, only KeyGroupsStateHandle need to cast.
- if (!(originKeyedStateHandle instanceof KeyGroupsStateHandle)
- || originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
+ // contain streamStateHandle, KeyGroupsStateHandle need to cast.
and private handles
+ // in IncrementalRemoteKeyedStateHandle also need to cast.
+ if (originKeyedStateHandle instanceof
KeyGroupsSavepointStateHandle) {
return originKeyedStateHandle;
- } else {
+ }
+ if (originKeyedStateHandle instanceof KeyGroupsStateHandle) {
StreamStateHandle streamStateHandle =
((KeyGroupsStateHandle)
originKeyedStateHandle).getDelegateStateHandle();
if (streamStateHandle instanceof FileStateHandle) {
- FileStateHandle fileStateHandle =
- new FileStateHandle(
- ((FileStateHandle)
streamStateHandle).getFilePath(),
- streamStateHandle.getStateSize());
+ StreamStateHandle fileStateHandle =
restoreFileStateHandle(streamStateHandle);
return KeyGroupsStateHandle.restore(
((KeyGroupsStateHandle)
originKeyedStateHandle).getGroupRangeOffsets(),
fileStateHandle,
originKeyedStateHandle.getStateHandleId());
}
- return originKeyedStateHandle;
}
+ if (originKeyedStateHandle instanceof
IncrementalRemoteKeyedStateHandle) {
+ IncrementalRemoteKeyedStateHandle
incrementalRemoteKeyedStateHandle =
+ (IncrementalRemoteKeyedStateHandle)
originKeyedStateHandle;
+
+ StreamStateHandle newMetaStateHandle =
+ restoreFileStateHandle(
+
incrementalRemoteKeyedStateHandle.getMetaStateHandle());
+ Map<StateHandleID, StreamStateHandle> newPrivateStates =
+
incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
Review Comment:
```suggestion
Map<StateHandleID, StreamStateHandle> castPrivateStates =
incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]