GitHub user sihuazhou opened a pull request:
https://github.com/apache/flink/pull/5074
[FLINK-7873] [runtime] Introduce local recovery
## What is the purpose of the change
This PR fixes
[FLINK-7873](https://issues.apache.org/jira/browse/FLINK-7873). Current recover
strategy will always read checkpoint data from remote FileStream (HDFS). This
will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse,
if this job performs recover again and again, it can eat up all network
bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the
checkpoint data locally, and read checkpoint data from local cache as well as
we can, we read the data from remote only if we fail locally. The advantage is
that if a execution is assigned to the same TaskManager as before, it can save
a lot of network, and obtain a faster recovery.
## Brief change log
- *Add CheckpointCacheManager for TM to manage Local Checkpoint Data for
each TM*
- *Add CheckpointCache for Task to manage Local Checkpoint Data for each
Task*
- *Add CachedCheckpointStreamFactory to write checkpoint data to both DFS
and local disk*
- *Add CachedStreamStateHandle to read checkpoint data from local or
remote*
- Here is a doc for detail:
[local_recovery.docx](https://docs.google.com/document/d/1-yZvTNV6_Nx1XUh3zwAFZqGgF2nkXckTGJy4tx-WzVQ/edit?usp=sharing)
## Verifying this change
This change added tests and can be verified as follows:
- Add tests in `CheckpointCacheManagerTest.java`,
`CheckpointCacheTest.java`, `CachedCheckpointStreamFactoryTest.java`,
`SharedCacheRegistryTest.java`.
- Compile this PR and deploy it on a cluster, trigger failure randomly. (I
tested this on a yarn cluster and with `a naive Scheduler mechanism` that
allocates slot only according to state.)
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
## Documentation
- Does this pull request introduce a new feature? (Yes)
- doc link :[local
recovery](https://docs.google.com/document/d/1-yZvTNV6_Nx1XUh3zwAFZqGgF2nkXckTGJy4tx-WzVQ/edit?usp=sharing)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sihuazhou/flink local_recovery
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5074.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5074
----
commit c384527a3a5668853334be08f5f3be98aaa43a8e
Author: summerleafs <[email protected]>
Date: 2017-11-22T05:03:21Z
introduce CheckpointCacheManager for local recovery.
Add note and unit test for CheckpointCacheManager.
add more tests for local recovery.
add more tests for local recovery.
add comment for SharedCacheRegistry.
commit b40fd80a57f59725eb888f2d6b56be97da0a5818
Author: summerleafs <[email protected]>
Date: 2017-11-24T06:00:38Z
add checkpoint cache for HeapKeyedBackend.
fix tests.
fix migrate bug.
----
---