Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153492082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- @StefanRRichter Thanks very much for reviewing my code and Tanks very much for your so detailed expression of your opinion, very happy can be similar to what you think in some places, there are two things I want to explain a bit: 1. About the 1:1 relationship between remote handle and local handle, In fact, I think each local state handle corresponds to a smallest storage unit of a checkpoint. For example, each Backend will generates a `IncrementalKeyedStateHandle` for every increment checkpoint, but `IncrementalKeyedStateHandle` is a composite handle, it contains a collect of sub StateHandle to stores data (meta & sst & misc), in this case the sub StateHanlde is the smallest storage unit and each of them have 1:1 relationship with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship with local state handleï¼Now, CheckpointStateOutputStream.closeAndGet () returns a remote handle, which I viewed as the smallest storage unit). For incremental checkpoint, it can be optimized indeed, we can provide a green path for it to put cache entry into checkpoint cache, it doesn't need to write data locally when Transmitting data to remote end. I didn't do that because I wanted to provide a unified way to meet up all Backends requirements and I didn't want to change the code of Backend so much. 2. The local handle can be not only a local file, it can also be stored in memory, or other storage medium, or even just a mock (it may apply to CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit CachedStateHandle and implement corresponding classes. IMO map local state to checkpoint id can also work, but I have ome minor questions about that: 1. Can we provide a unified local state way to meet all of the current state backend requirements (of course, the RocksDB can be optimized)? 2. Since the local state is mapped according to the checkpoint id, the key range detection needs to be performed locally again, which is a bit repetitive, can this be avoided with the work on JM. Although I've expressed my ideas, but I think you are more professional than me in this area and your thought should be better than mine. So if you have any planned issues, I would like to close this PR and turn to work on your planned issues, it seems that even thought this PR has some ideas which are similar to yours, but it seem not the base version you expected. But currently, we will still use this version of local checkpoint (it still need addressed some problem as your comments) for production, because the flink 1.4 does not have this feature and we need it very much (Our state size is very huge), With 1.5 release, we will switch and use the community version.
---