Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5074#discussion_r153492082
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
    @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
                        byte[] internalData = byteStreamStateHandle.getData();
                        dos.writeInt(internalData.length);
                        dos.write(byteStreamStateHandle.getData());
    +           } else if (stateHandle instanceof CachedStreamStateHandle) {
    --- End diff --
    
    @StefanRRichter Thanks very much for reviewing my code and Tanks very much 
for your so detailed expression of your opinion, very happy can be similar to 
what you think in some places, there are two things I want to explain a bit:
    
    1. About the 1:1 relationship between remote handle and local handle, In 
fact, I think each local state handle corresponds to a smallest storage unit of 
a checkpoint. For example, each Backend will generates a 
`IncrementalKeyedStateHandle` for every increment checkpoint, but 
`IncrementalKeyedStateHandle` is a composite handle, it contains a collect of 
sub StateHandle to stores data (meta & sst & misc), in this case the sub 
StateHanlde is the smallest storage unit and each of them have 1:1 relationship 
with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship 
with local state handle(Now, CheckpointStateOutputStream.closeAndGet () 
returns a remote handle, which I viewed as the smallest storage unit). For 
incremental checkpoint, it can be optimized indeed, we can provide a green path 
for it to put cache entry into checkpoint cache, it doesn't need to write data 
locally when Transmitting data to remote end. I didn't do that because I wanted 
to provide a unified
  way to meet up all Backends requirements and I didn't want to change the code 
of Backend so much.
    
    2. The local handle can be not only a local file, it can also be stored in 
memory, or other storage medium, or even just a mock (it may apply to 
CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit 
CachedStateHandle and implement corresponding classes.
    
    IMO map local state to checkpoint id can also work, but I have ome minor 
questions about that:
    1. Can we provide a unified local state way to meet all of the current 
state backend requirements (of course, the RocksDB can be optimized)?  
    2. Since the local state is mapped according to the checkpoint id, the key 
range detection needs to be performed locally again, which is a bit repetitive, 
can this be avoided with the work on JM.
    
    
    Although I've expressed my ideas, but I think you are more professional 
than me in this area and your thought should be better than mine. So if you 
have any planned issues, I would like to close this PR and turn to work on your 
planned issues, it seems that even thought this PR has some ideas which are 
similar to yours, but it seem not the base version you expected. But currently, 
we will still use this version of local checkpoint (it still need addressed 
some problem as your comments) for production, because the flink 1.4 does not 
have this feature and we need it very much (Our state size is very huge), With 
1.5 release, we will switch and use the community version.


---

Reply via email to