Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5074#discussion_r153235421
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
---
@@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
+ } else if (stateHandle instanceof CachedStreamStateHandle) {
--- End diff --
This means we are actually introducing significant new code to the job
manager, that even impacts the serialization format. I think this should not
strictly be required if we map local state to checkpoint ids.
---