[ 
https://issues.apache.org/jira/browse/FLINK-38990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liu updated FLINK-38990:
------------------------
    Description: 
h1. Summary

Add a new configuration option execution.checkpointing.initial-delay to allow 
users to configure the initial delay before the first checkpoint is triggered 
after job startup.
h1. Motivation

When a Flink streaming job starts consuming from a message queue (e.g., Kafka, 
Pulsar) with a significant backlog, the job needs time to catch up with the 
accumulated data. During this catch-up phase, triggering checkpoints can 
negatively impact processing performance due to:
 * Memory pressure: Checkpoint barriers alignment and state snapshots consume 
additional memory
 * I/O overhead: Writing state to external storage increases disk/network
 * I/OReduced throughput: Checkpoint operations compete with data processing 
for resources

Currently, the initial checkpoint delay is calculated randomly within the range 
[minPauseBetweenCheckpoints, baseInterval + 1) (see getRandomInitDelay() in 
CheckpointCoordinator.java), which:
 * Cannot be directly configured by users
 * May not provide sufficient delay for jobs with large backlogs
 * Has a maximum value limited to baseInterval

While Flink already provides execution.checkpointing.interval-during-backlog 
(introduced in FLIP-309) to adjust checkpoint intervals during backlog 
processing, there is no dedicated option to delay the first checkpoint trigger 
after job startup.This new configuration 
`execution.checkpointing.initial-delay` is complementary to 
the existing `execution.checkpointing.interval-during-backlog` :

| Configuration | Scope | Trigger | Purpose |
|--------------|-------|---------|---------|
| `initial-delay` | First checkpoint only | Unconditional | Delay first 
checkpoint during job startup |
| `interval-during-backlog` | All checkpoints during backlog | Source reports 
`isProcessingBacklog=true` | Reduce checkpoint frequency during backlog |

*Key differences:*
 * initial-delay works without Source support, making it useful for sources 
that don't implement backlog detection
 * initial-delay is a one-time delay, while `interval-during-backlog` 
continuously affects checkpoint scheduling
 * Both can be used together: `initial-delay` for startup warm-up, 
`interval-during-backlog` for ongoing backlog handling

h1. Proposed Changes

Add a new configuration in 
ExecutionCheckpointingOptions:execution.checkpointing.initial-delay

  was:
h1. Summary

Add a new configuration option execution.checkpointing.initial-delay to allow 
users to configure the initial delay before the first checkpoint is triggered 
after job startup.
h1. Motivation

When a Flink streaming job starts consuming from a message queue (e.g., Kafka, 
Pulsar) with a significant backlog, the job needs time to catch up with the 
accumulated data. During this catch-up phase, triggering checkpoints can 
negatively impact processing performance due to:
 * Memory pressure: Checkpoint barriers alignment and state snapshots consume 
additional memory
 * I/O overhead: Writing state to external storage increases disk/network
 * I/OReduced throughput: Checkpoint operations compete with data processing 
for resources

Currently, the initial checkpoint delay is calculated randomly within the range 
[minPauseBetweenCheckpoints, baseInterval + 1) (see getRandomInitDelay() in 
CheckpointCoordinator.java), which:
 * Cannot be directly configured by users
 * May not provide sufficient delay for jobs with large backlogs
 * Has a maximum value limited to baseInterval

While Flink already provides execution.checkpointing.interval-during-backlog 
(introduced in FLIP-309) to adjust checkpoint intervals during backlog 
processing, there is no dedicated option to delay the first checkpoint trigger 
after job startup.
h1. Proposed Changes

Add a new configuration in 
ExecutionCheckpointingOptions:execution.checkpointing.initial-delay


> Support configurable initial delay for first checkpoint trigger
> ---------------------------------------------------------------
>
>                 Key: FLINK-38990
>                 URL: https://issues.apache.org/jira/browse/FLINK-38990
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Liu
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Summary
> Add a new configuration option execution.checkpointing.initial-delay to allow 
> users to configure the initial delay before the first checkpoint is triggered 
> after job startup.
> h1. Motivation
> When a Flink streaming job starts consuming from a message queue (e.g., 
> Kafka, Pulsar) with a significant backlog, the job needs time to catch up 
> with the accumulated data. During this catch-up phase, triggering checkpoints 
> can negatively impact processing performance due to:
>  * Memory pressure: Checkpoint barriers alignment and state snapshots consume 
> additional memory
>  * I/O overhead: Writing state to external storage increases disk/network
>  * I/OReduced throughput: Checkpoint operations compete with data processing 
> for resources
> Currently, the initial checkpoint delay is calculated randomly within the 
> range [minPauseBetweenCheckpoints, baseInterval + 1) (see 
> getRandomInitDelay() in CheckpointCoordinator.java), which:
>  * Cannot be directly configured by users
>  * May not provide sufficient delay for jobs with large backlogs
>  * Has a maximum value limited to baseInterval
> While Flink already provides execution.checkpointing.interval-during-backlog 
> (introduced in FLIP-309) to adjust checkpoint intervals during backlog 
> processing, there is no dedicated option to delay the first checkpoint 
> trigger after job startup.This new configuration 
> `execution.checkpointing.initial-delay` is complementary to 
> the existing `execution.checkpointing.interval-during-backlog` :
> | Configuration | Scope | Trigger | Purpose |
> |--------------|-------|---------|---------|
> | `initial-delay` | First checkpoint only | Unconditional | Delay first 
> checkpoint during job startup |
> | `interval-during-backlog` | All checkpoints during backlog | Source reports 
> `isProcessingBacklog=true` | Reduce checkpoint frequency during backlog |
> *Key differences:*
>  * initial-delay works without Source support, making it useful for sources 
> that don't implement backlog detection
>  * initial-delay is a one-time delay, while `interval-during-backlog` 
> continuously affects checkpoint scheduling
>  * Both can be used together: `initial-delay` for startup warm-up, 
> `interval-during-backlog` for ongoing backlog handling
> h1. Proposed Changes
> Add a new configuration in 
> ExecutionCheckpointingOptions:execution.checkpointing.initial-delay



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to