[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16268724#comment-16268724
 ] 

ASF GitHub Bot commented on FLINK-7873:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5074#discussion_r153492082
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
    @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
                        byte[] internalData = byteStreamStateHandle.getData();
                        dos.writeInt(internalData.length);
                        dos.write(byteStreamStateHandle.getData());
    +           } else if (stateHandle instanceof CachedStreamStateHandle) {
    --- End diff --
    
    @StefanRRichter Thanks very much for reviewing my code and Tanks very much 
for your so detailed expression of your opinion, very happy can be similar to 
what you think in some places, there are two things I want to explain a bit:
    
    1. About the 1:1 relationship between remote handle and local handle, In 
fact, I think each local state handle corresponds to a smallest storage unit of 
a checkpoint. For example, each Backend will generates a 
`IncrementalKeyedStateHandle` for every increment checkpoint, but 
`IncrementalKeyedStateHandle` is a composite handle, it contains a collect of 
sub StateHandle to stores data (meta & sst & misc), in this case the sub 
StateHanlde is the smallest storage unit and each of them have 1:1 relationship 
with local state handle and `IncrementalKeyedStateHandle` has 1:N relationship 
with local state handle(Now, CheckpointStateOutputStream.closeAndGet () returns 
a remote handle, which I viewed as the smallest storage unit). For incremental 
checkpoint, it can be optimized indeed, we can provide a green path for it to 
put cache entry into checkpoint cache, it doesn't need to write data locally 
when Transmitting data to remote end. I didn't do that because I wanted to 
provide a unified way to meet up all Backends requirements and I didn't want to 
change the code of Backend so much.
    
    2. The local handle can be not only a local file, it can also be stored in 
memory, or other storage medium, or even just a mock (it may apply to 
CopyOnWriteStateTableSnapshot's problem describe above) as long as inherit 
CachedStateHandle and implement corresponding classes.
    
    IMO map local state to checkpoint id can also work, but I have ome minor 
questions about that:
    1. Can we provide a unified local state way to meet all of the current 
state backend requirements (of course, the RocksDB can be optimized)?  
    2. Since the local state is mapped according to the checkpoint id, the key 
range detection needs to be performed locally again, which is a bit repetitive, 
can this be avoided with the work on JM.
    
    
    Although I've expressed my ideas, but I think you are more professional 
than me in this area and your thought should be better than mine. So if you 
have any planned issues, I would like to close this PR and turn to work on your 
planned issues, it seems that even thought this PR has some ideas which are 
similar to yours, but it seem not the base version you expected. But currently, 
we will still use this version of local checkpoint (it still need addressed 
some problem as your comments) for production, because the flink 1.4 does not 
have this feature and we need it very much (Our state size is very huge), With 
1.5 release, we will switch and use the community version.


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7873
>                 URL: https://issues.apache.org/jira/browse/FLINK-7873
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> Why i introduce this:
>     Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
>     TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to