[
https://issues.apache.org/jira/browse/FLINK-35624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864534#comment-17864534
]
Yanfei Lei edited comment on FLINK-35624 at 7/10/24 8:14 AM:
-------------------------------------------------------------
I tested some scenarios after [~zakelly]'s fix. Thanks all for the effort,
let's see if there are other scenarios with the problem.
|1|Scenario|Restore Mode|Retain or Delete|Expected behavior|Actual behavior|
|1|Cancel Job with file merging enabled
|-|RETAIN| * Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num"|✅|
|2|Cancel Job with file merging enabled|-|DELETE| * shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|3|Restore Job
ON -> Restore -> ON
|CLAIM|RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|4|Restore Job
ON -> Restore -> ON|NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1 :
* The checkpoint dir layout remains the same as the layout before Job 2 starts.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|5|Restore Job
ON -> Restore -> OFF|CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* no subtask dir under shared/
* no task manager dir under taskowned/
* the operator state should be stored in chk-x/|✅|
|6|Restore Job
ON -> Restore -> OFF|NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1 :
* The checkpoint dir layout remains the same as the layout before Job 2 starts.
* Job 2:
* no subtask dir under shared/
* no task manager dir under taskowned/
* the operator state should be stored in chk-x/|✅|
|7|Restore Job
OFF -> Restore -> ON|CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|8|Restore Job
OFF -> Restore -> ON|NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1 :
* The checkpoint dir layout remains the same as the layout before Job 2
starts.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|9|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* Subtask dir should be created under shared/, *the number of subtask dir ==
3(number of rescaled subtask)*
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|10|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* The checkpoint dir layout remains the same as the layout before Job 2
starts. *the number of subtask dir == 2 (number of subtasks before rescale)*
* Job 2:
* Subtask dir should be created under shared/, *the number of subtask dir ==
3(number of rescaled subtask)*
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|11|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |NO CLAIM|Job 1: RETAIN
Job 2: DELETE|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* The checkpoint dir layout remains the same as the layout before Job 2
starts. *the number of subtask dir == 2 (number of subtasks before rescale)*
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|12|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |CLAIM|Job 1 : RETAIN
Job 2: DELETE|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|13|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |CLAIM|Job 1: RETAIN
Job 2: DELETE|Cancel before {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|14|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |NO CLAIM|Job 1: RETAIN
Job 2: DELETE|Cancel before {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* The checkpoint dir layout remains the same as the layout before Job 2
starts.
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|15|Job Fail Over
File merging ON|CLAIM|DELETE| * Kill the TM to simulate Failover.
[https://github.com/fredia/flink/commit/656c4ef963ae060a01dcaae1ebb54c21e968fc45]
* There should be "Restoring job xxx from" in log.
* After restoring: The checkpoint dir layout shouldn't change, the jobId in
subtask dir path shouldn't change.
* After canceling:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|16|Job Fail Over
File merging ON|NO CLAIM|DELETE|Same as row 15.|✅|
was (Author: yanfei lei):
I tested some scenarios after [~zakelly]'s fix. Thanks all for the effort,
let's see if there are other scenarios with the problem.
||Scenario|Restore Mode|Retain or Delete|Expected behavior|Actual behavior|
|1|Cancel Job with file merging enabled
|-|RETAIN| * Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num"|✅|
|2|Cancel Job with file merging enabled|-|DELETE| * shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|3|Restore Job
ON -> Restore -> ON
|CLAIM|RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|4|Restore Job
ON -> Restore -> ON|NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1 :
* The checkpoint dir layout remains the same as the layout before Job 2 starts.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|5|Restore Job
ON -> Restore -> OFF|CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* no subtask dir under shared/
* no task manager dir under taskowned/
* the operator state should be stored in chk-x/|✅|
|6|Restore Job
ON -> Restore -> OFF|NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1 :
* The checkpoint dir layout remains the same as the layout before Job 2 starts.
* Job 2:
* no subtask dir under shared/
* no task manager dir under taskowned/
* the operator state should be stored in chk-x/|✅|
|7|Restore Job
OFF -> Restore -> ON|CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|8|Restore Job
OFF -> Restore -> ON|NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1 :
* The checkpoint dir layout remains the same as the layout before Job 2
starts.
* Job 2:
* Subtask dir should be created under shared/
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|9|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* Subtask dir should be created under shared/, *the number of subtask dir ==
3(number of rescaled subtask)*
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|10|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |NO CLAIM|Job 1 : RETAIN
Job 2 : RETAIN|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* The checkpoint dir layout remains the same as the layout before Job 2
starts. *the number of subtask dir == 2 (number of subtasks before rescale)*
* Job 2:
* Subtask dir should be created under shared/, *the number of subtask dir ==
3(number of rescaled subtask)*
* Task manager dir should be created under taskowned/
* The number of chk-x directories should be equal to "retain num" |✅|
|11|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |NO CLAIM|Job 1: RETAIN
Job 2: DELETE|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* The checkpoint dir layout remains the same as the layout before Job 2
starts. *the number of subtask dir == 2 (number of subtasks before rescale)*
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|12|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |CLAIM|Job 1 : RETAIN
Job 2: DELETE|Cancel after {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|13|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |CLAIM|Job 1: RETAIN
Job 2: DELETE|Cancel before {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|14|Rescale Job
ON (parallelism 2) -> Restore -> ON (parallelism 3) |NO CLAIM|Job 1: RETAIN
Job 2: DELETE|Cancel before {{{}job2 completes the 1st checkpoint{}}}: * Job 1:
* The checkpoint dir layout remains the same as the layout before Job 2
starts.
* Job 2:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|15|Job Fail Over
File merging ON|CLAIM|DELETE| * Kill the TM to simulate Failover.
[https://github.com/fredia/flink/commit/656c4ef963ae060a01dcaae1ebb54c21e968fc45]
* There should be "Restoring job xxx from" in log.
* After restoring: The checkpoint dir layout shouldn't change, the jobId in
subtask dir path shouldn't change.
* After canceling:
* shared/ dir should be empty.
* taskowned dir should be empty
* no chk-x dir.|✅|
|16|Job Fail Over
File merging ON|NO CLAIM|DELETE|Same as row 15.|✅|
> Release Testing: Verify FLIP-306 Unified File Merging Mechanism for
> Checkpoints
> -------------------------------------------------------------------------------
>
> Key: FLINK-35624
> URL: https://issues.apache.org/jira/browse/FLINK-35624
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Reporter: Zakelly Lan
> Assignee: Rui Fan
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.20.0
>
> Attachments: image-2024-07-07-14-04-47-065.png,
> image-2024-07-08-17-05-40-546.png
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-32070
>
> 1.20 is the MVP version for FLIP-306. It is a little bit complex and should
> be tested carefully. The main idea of FLIP-306 is to merge checkpoint files
> in TM side, and provide new {{{}StateHandle{}}}s to the JM. There will be a
> TM-managed directory under the 'shared' checkpoint directory for each
> subtask, and a TM-managed directory under the 'taskowned' checkpoint
> directory for each Task Manager. Under those new introduced directories, the
> checkpoint files will be merged into smaller file set. The following
> scenarios need to be tested, including but not limited to:
> # With the file merging enabled, periodic checkpoints perform properly, and
> the failover, restore and rescale would also work well.
> # Switch the file merging on and off across jobs, checkpoints and recovery
> also work properly.
> # There will be no left-over TM-managed directory, especially when there is
> no cp complete before the job cancellation.
> # File merging takes no effect in (native) savepoints.
> Besides the behaviors above, it is better to validate the function of space
> amplification control and metrics. All the config options can be found under
> 'execution.checkpointing.file-merging'.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)