[
https://issues.apache.org/jira/browse/FLINK-24149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Feifan Wang updated FLINK-24149:
--------------------------------
Description:
#
h1. Backgroud
We have many job with large state size in production environment. According to
the operation practice of these jobs and the analysis of some specific
problems, we believe that RocksDBStateBackend's incremental checkpoint has many
advantages over savepoint:
## Savepoint cost much longer time then incremental checkpoint in jobs with
large state. The figure below is a job in our production environment, it takes
nearly 7 minutes to complete a savepoint, while checkpoint only takes a few
seconds.( checkpoint after savepoint case longer time is a problem described in
-FLINK-23949-)
## Savepoint causes excessive cpu usage. The figure below shows the CPU usage
of the same job in the above figure :
## Savepoint may cause excessive native memory usage and eventually cause the
TaskManager process memory usage to exceed the limit. (We did not further
investigate the cause and did not try to reproduce the problem on other large
state jobs, but only increased the overhead memory. So this reason may not be
so conclusive. )
For the above reasons, we tend to use retained incremental checkpoint to
completely replace savepoint for jobs with large state size.
#
h1. Problems
##
h2. Problem 1 : retained incremental checkpoint difficult to clean up once they
used for recovery
This problem caused by jobs recoveryed from a retained incremental checkpoint
may reference files on this retained incremental checkpoint's shared directory
in subsequent checkpoints, even they are not in a same job instance. The worst
case is that the retained checkpoint will be referenced one by one, forming a
very long reference chain.This makes it difficult for users to manage retained
checkpoints. In fact, we have also suffered failures caused by incorrect
deletion of retained checkpoints.
Although we can use the file handle in checkpoint metadata to figure out which
files can be deleted, but I think it is inappropriate to let users do this.
##
h2. Problem 2 : checkpoint not relocatable
Even if we can figure out all files referenced by a checkpoint, moving these
files will invalidate the checkpoint as well, because the metadata file
references absolute file paths.
Since savepoint already be self-contained and relocatable (FLINK-5763), why
don't we use savepoint just for migrate jobs to another place ? In addition to
the savepoint performance problem in the background description, a very
important reason is that the migration requirement may come from the failure of
the original cluster. In this case, there is no opportunity to trigger
savepoint.
#
h1. Proposal
## job's checkpoint directory (user-defined-checkpoint-dir/<jobId>) contains
all their state files (self-contained)
As far as I know, in the current status, only the subsequent checkpoints of the
jobs restored from the retained checkpoint violate this constraint. One
possible solution is to re-upload all shared files at the first incremental
checkpoint after the job started, but we need to discuss how to distinguish
between a new job instance and a restart.
## use relative file path in checkpoint metadata (relocatable)
Change all file references in checkpoint metadata to the relative path relative
to the _metadata file, so we can copy user-defined-checkpoint-dir/<jobId> to
any other place.
was:
h3. 1. Backgroud
FLINK-5763 proposal make savepoint relocatable, checkpoint has similar
requirements. For example, to migrate jobs to other HDFS clusters, although it
can be achieved through a savepoint, but we prefer to use persistent
checkpoints, especially RocksDBStateBackend incremental checkpoints have better
performance than savepoint during snapshot and restore.
FLINK-8531 standardized directory layout :
{code:java}
/user-defined-checkpoint-dir
|
+ 1b080b6e710aabbef8993ab18c6de98b (job's ID)
|
+ --shared/
+ --taskowned/
+ --chk-00001/
+ --chk-00002/
+ --chk-00003/
...
{code}
* State backend will create a subdirectory with the job's ID that will contain
the actual checkpoints, such as:
user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/
* Each checkpoint individually will store all its files in a subdirectory that
includes the checkpoint number, such as:
user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/chk-00003/
* Files shared between checkpoints will be stored in the shared/ directory in
the same parent directory as the separate checkpoint directory, such as:
user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/shared/
* Similar to shared files, files owned strictly by tasks will be stored in the
taskowned/ directory in the same parent directory as the separate checkpoint
directory, such as:
user-defined-checkpoint-dir/1b080b6e710aabbef8993ab18c6de98b/taskowned/
h3. Proposal
Since the individually checkpoint directory does not contain complete state
data, we cannot make it relocatable, but its parent directory can. The only
work left is make the metadata file references relative file paths.
I proposal make these changes to _*FsCheckpointStateOutputStream*_ :
* introduce _*checkpointDirectory*_ field, and remove *_allowRelativePaths_*
field
* introduce *_entropyInjecting_* field
* *_closeAndGetHandle()_* return _*RelativeFileStateHandle*_ with relative
path base on _*checkpointDirectory*_ (except entropy injecting file system)
[~yunta], [~trohrmann] , I verified this in our environment , and submitted a
pull request to accomplish this feature. Please help evaluate whether it is
appropriate.
> Make checkpoint self-contained and relocatable
> ----------------------------------------------
>
> Key: FLINK-24149
> URL: https://issues.apache.org/jira/browse/FLINK-24149
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Reporter: Feifan Wang
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2021-09-08-17-06-31-560.png,
> image-2021-09-08-17-10-28-240.png, image-2021-09-08-17-55-46-898.png,
> image-2021-09-08-18-01-03-176.png
>
>
> #
> h1. Backgroud
> We have many job with large state size in production environment. According
> to the operation practice of these jobs and the analysis of some specific
> problems, we believe that RocksDBStateBackend's incremental checkpoint has
> many advantages over savepoint:
> ## Savepoint cost much longer time then incremental checkpoint in jobs with
> large state. The figure below is a job in our production environment, it
> takes nearly 7 minutes to complete a savepoint, while checkpoint only takes a
> few seconds.( checkpoint after savepoint case longer time is a problem
> described in -FLINK-23949-)
> ## Savepoint causes excessive cpu usage. The figure below shows the CPU
> usage of the same job in the above figure :
> ## Savepoint may cause excessive native memory usage and eventually cause
> the TaskManager process memory usage to exceed the limit. (We did not further
> investigate the cause and did not try to reproduce the problem on other large
> state jobs, but only increased the overhead memory. So this reason may not be
> so conclusive. )
> For the above reasons, we tend to use retained incremental checkpoint to
> completely replace savepoint for jobs with large state size.
> #
> h1. Problems
> ##
> h2. Problem 1 : retained incremental checkpoint difficult to clean up once
> they used for recovery
> This problem caused by jobs recoveryed from a retained incremental checkpoint
> may reference files on this retained incremental checkpoint's shared
> directory in subsequent checkpoints, even they are not in a same job
> instance. The worst case is that the retained checkpoint will be referenced
> one by one, forming a very long reference chain.This makes it difficult for
> users to manage retained checkpoints. In fact, we have also suffered failures
> caused by incorrect deletion of retained checkpoints.
> Although we can use the file handle in checkpoint metadata to figure out
> which files can be deleted, but I think it is inappropriate to let users do
> this.
> ##
> h2. Problem 2 : checkpoint not relocatable
> Even if we can figure out all files referenced by a checkpoint, moving these
> files will invalidate the checkpoint as well, because the metadata file
> references absolute file paths.
> Since savepoint already be self-contained and relocatable (FLINK-5763), why
> don't we use savepoint just for migrate jobs to another place ? In addition
> to the savepoint performance problem in the background description, a very
> important reason is that the migration requirement may come from the failure
> of the original cluster. In this case, there is no opportunity to trigger
> savepoint.
> #
> h1. Proposal
> ## job's checkpoint directory (user-defined-checkpoint-dir/<jobId>) contains
> all their state files (self-contained)
> As far as I know, in the current status, only the subsequent checkpoints of
> the jobs restored from the retained checkpoint violate this constraint. One
> possible solution is to re-upload all shared files at the first incremental
> checkpoint after the job started, but we need to discuss how to distinguish
> between a new job instance and a restart.
> ## use relative file path in checkpoint metadata (relocatable)
> Change all file references in checkpoint metadata to the relative path
> relative to the _metadata file, so we can copy
> user-defined-checkpoint-dir/<jobId> to any other place.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)