[ 
https://issues.apache.org/jira/browse/FLINK-34519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunfeng Zhou updated FLINK-34519:
---------------------------------
    Description: 
In the current implementation, CheckpointCoordinator#startCheckpointScheduler 
would stop the checkpoint scheduler before starting it, and 
CheckpointCoordinator#stopCheckpointScheduler would cancel all ongoing and 
pending checkpoints. When a stop-with-savepoint request is received, checkpoint 
coordinator would trigger stopCheckpointScheduler before creating the 
savepoint, and start the scheduler afterwards if the savepoint fails.

The problem with this behavior is that it mixed up behavior different 
checkpointing types. For example, stopCheckpointScheduler() only needs to 
cancel previous periodic checkpoints, while the current behavior cancels 
ongoing savepoints as well. This behavior is still acceptable for now, given 
that there have only been periodic checkpoints and manual savepoints, and 
savepoints are the only one to change checkpointing behavior once a Flink job 
starts. However, as the Batch-Streaming Unification optimizations need to 
change some of these assumptions, the checkpoint coordinator should fix this 
problem.

To be exact, checkpoint coordinator should at least distinguish between the 
following semantics.

- Periodic checkpoint is enabled to ensure that failover recovery time should 
be kept within a time limit.
- Periodic checkpoint is disabled to reduce corresponding performance overhead, 
but the ability to checkpoint still exists and users can trigger a savepoint 
anytime.
- Checkpoint or savepoint is not allowed due to job status or topological 
requirements. There might be multiple requirements applicable to a Flink job at 
the same time, and releasing one of them is not enough to enable checkpoints.

It should also be supported for a Flink job to change between the checkpointing 
semantics mentioned above dynamically during runtime.

Besides, checkpoints canceled in stopCheckpointScheduler() would fail with an 
error message saying "Checkpoint Coordinator is suspending", which is ambiguous 
for debugging. The detailed reason should be recorded as well.

  was:
In the current implementation, CheckpointCoordinator#startCheckpointScheduler 
would stop the checkpoint scheduler before starting it, and 
CheckpointCoordinator#stopCheckpointScheduler would cancel all ongoing and 
pending checkpoints. When a stop-with-savepoint request is received, checkpoint 
coordinator would trigger stopCheckpointScheduler before creating the 
savepoint, and start the scheduler afterwards if the savepoint fails.

The problem with this behavior is that it mixed up different checkpointing 
types. For example, stopCheckpointScheduler() only needs to cancel previous 
periodic checkpoints, while the current behavior cancels ongoing savepoints as 
well. This behavior is still acceptable for now, given that periodic 
checkpointing is enabled so long as a job is running, and two users would 
hardly trigger savepoints at the same time. However, as the Batch-Streaming 
Unification optimizations need to change some of these assumptions, the 
checkpoint coordinator should fix this problem.

To be exact, checkpoint coordinator should at least distinguish between the 
following semantics.
 - Periodic checkpoint is enabled to ensure that failover recovery time should 
be kept within a time limit.
 - Periodic checkpoint is disabled to reduce corresponding performance 
overhead, but the ability to checkpoint still exists and users can trigger a 
savepoint anytime.
 - Checkpoint or savepoint is not allowed due to job status or topological 
requirements.

It should also be supported for a Flink job to change between the checkpointing 
semantics mentioned above dynamically during runtime.

Besides, checkpoints canceled in stopCheckpointScheduler() would fail with an 
error message saying "Checkpoint Coordinator is suspending", which is ambiguous 
for debugging. The detailed reason should be recorded as well.


> Refine checkpoint scheduling and canceling logic
> ------------------------------------------------
>
>                 Key: FLINK-34519
>                 URL: https://issues.apache.org/jira/browse/FLINK-34519
>             Project: Flink
>          Issue Type: Technical Debt
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.20.0
>            Reporter: Yunfeng Zhou
>            Priority: Major
>
> In the current implementation, CheckpointCoordinator#startCheckpointScheduler 
> would stop the checkpoint scheduler before starting it, and 
> CheckpointCoordinator#stopCheckpointScheduler would cancel all ongoing and 
> pending checkpoints. When a stop-with-savepoint request is received, 
> checkpoint coordinator would trigger stopCheckpointScheduler before creating 
> the savepoint, and start the scheduler afterwards if the savepoint fails.
> The problem with this behavior is that it mixed up behavior different 
> checkpointing types. For example, stopCheckpointScheduler() only needs to 
> cancel previous periodic checkpoints, while the current behavior cancels 
> ongoing savepoints as well. This behavior is still acceptable for now, given 
> that there have only been periodic checkpoints and manual savepoints, and 
> savepoints are the only one to change checkpointing behavior once a Flink job 
> starts. However, as the Batch-Streaming Unification optimizations need to 
> change some of these assumptions, the checkpoint coordinator should fix this 
> problem.
> To be exact, checkpoint coordinator should at least distinguish between the 
> following semantics.
> - Periodic checkpoint is enabled to ensure that failover recovery time should 
> be kept within a time limit.
> - Periodic checkpoint is disabled to reduce corresponding performance 
> overhead, but the ability to checkpoint still exists and users can trigger a 
> savepoint anytime.
> - Checkpoint or savepoint is not allowed due to job status or topological 
> requirements. There might be multiple requirements applicable to a Flink job 
> at the same time, and releasing one of them is not enough to enable 
> checkpoints.
> It should also be supported for a Flink job to change between the 
> checkpointing semantics mentioned above dynamically during runtime.
> Besides, checkpoints canceled in stopCheckpointScheduler() would fail with an 
> error message saying "Checkpoint Coordinator is suspending", which is 
> ambiguous for debugging. The detailed reason should be recorded as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to