[
https://issues.apache.org/jira/browse/FLINK-38990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Liu updated FLINK-38990:
------------------------
Description:
h1. Summary
Add a new configuration option execution.checkpointing.initial-delay to allow
users to configure the initial delay before the first checkpoint is triggered
after job startup.
h1. Motivation
When a Flink streaming job starts consuming from a message queue (e.g., Kafka,
Pulsar) with a significant backlog, the job needs time to catch up with the
accumulated data. During this catch-up phase, triggering checkpoints can
negatively impact processing performance due to:
* Memory pressure: Checkpoint barriers alignment and state snapshots consume
additional memory
* I/O overhead: Writing state to external storage increases disk/network
* I/OReduced throughput: Checkpoint operations compete with data processing
for resources
Currently, the initial checkpoint delay is calculated randomly within the range
[minPauseBetweenCheckpoints, baseInterval + 1) (see getRandomInitDelay() in
CheckpointCoordinator.java), which:
* Cannot be directly configured by users
* May not provide sufficient delay for jobs with large backlogs
* Has a maximum value limited to baseInterval
While Flink already provides execution.checkpointing.interval-during-backlog
(introduced in FLIP-309) to adjust checkpoint intervals during backlog
processing, there is no dedicated option to delay the first checkpoint trigger
after job startup.This new configuration
`execution.checkpointing.initial-delay` is complementary to
the existing `execution.checkpointing.interval-during-backlog` :
| Configuration | Scope | Trigger | Purpose |
|--------------|-------|---------|---------|
| `initial-delay` | First checkpoint only | Unconditional | Delay first
checkpoint during job startup |
| `interval-during-backlog` | All checkpoints during backlog | Source reports
`isProcessingBacklog=true` | Reduce checkpoint frequency during backlog |
*Key differences:*
* initial-delay works without Source support, making it useful for sources
that don't implement backlog detection
* initial-delay is a one-time delay, while `interval-during-backlog`
continuously affects checkpoint scheduling
* Both can be used together: `initial-delay` for startup warm-up,
`interval-during-backlog` for ongoing backlog handling
h1. Proposed Changes
Add a new configuration in
ExecutionCheckpointingOptions:execution.checkpointing.initial-delay
was:
h1. Summary
Add a new configuration option execution.checkpointing.initial-delay to allow
users to configure the initial delay before the first checkpoint is triggered
after job startup.
h1. Motivation
When a Flink streaming job starts consuming from a message queue (e.g., Kafka,
Pulsar) with a significant backlog, the job needs time to catch up with the
accumulated data. During this catch-up phase, triggering checkpoints can
negatively impact processing performance due to:
* Memory pressure: Checkpoint barriers alignment and state snapshots consume
additional memory
* I/O overhead: Writing state to external storage increases disk/network
* I/OReduced throughput: Checkpoint operations compete with data processing
for resources
Currently, the initial checkpoint delay is calculated randomly within the range
[minPauseBetweenCheckpoints, baseInterval + 1) (see getRandomInitDelay() in
CheckpointCoordinator.java), which:
* Cannot be directly configured by users
* May not provide sufficient delay for jobs with large backlogs
* Has a maximum value limited to baseInterval
While Flink already provides execution.checkpointing.interval-during-backlog
(introduced in FLIP-309) to adjust checkpoint intervals during backlog
processing, there is no dedicated option to delay the first checkpoint trigger
after job startup.
h1. Proposed Changes
Add a new configuration in
ExecutionCheckpointingOptions:execution.checkpointing.initial-delay
> Support configurable initial delay for first checkpoint trigger
> ---------------------------------------------------------------
>
> Key: FLINK-38990
> URL: https://issues.apache.org/jira/browse/FLINK-38990
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Reporter: Liu
> Priority: Major
> Labels: pull-request-available
>
> h1. Summary
> Add a new configuration option execution.checkpointing.initial-delay to allow
> users to configure the initial delay before the first checkpoint is triggered
> after job startup.
> h1. Motivation
> When a Flink streaming job starts consuming from a message queue (e.g.,
> Kafka, Pulsar) with a significant backlog, the job needs time to catch up
> with the accumulated data. During this catch-up phase, triggering checkpoints
> can negatively impact processing performance due to:
> * Memory pressure: Checkpoint barriers alignment and state snapshots consume
> additional memory
> * I/O overhead: Writing state to external storage increases disk/network
> * I/OReduced throughput: Checkpoint operations compete with data processing
> for resources
> Currently, the initial checkpoint delay is calculated randomly within the
> range [minPauseBetweenCheckpoints, baseInterval + 1) (see
> getRandomInitDelay() in CheckpointCoordinator.java), which:
> * Cannot be directly configured by users
> * May not provide sufficient delay for jobs with large backlogs
> * Has a maximum value limited to baseInterval
> While Flink already provides execution.checkpointing.interval-during-backlog
> (introduced in FLIP-309) to adjust checkpoint intervals during backlog
> processing, there is no dedicated option to delay the first checkpoint
> trigger after job startup.This new configuration
> `execution.checkpointing.initial-delay` is complementary to
> the existing `execution.checkpointing.interval-during-backlog` :
> | Configuration | Scope | Trigger | Purpose |
> |--------------|-------|---------|---------|
> | `initial-delay` | First checkpoint only | Unconditional | Delay first
> checkpoint during job startup |
> | `interval-during-backlog` | All checkpoints during backlog | Source reports
> `isProcessingBacklog=true` | Reduce checkpoint frequency during backlog |
> *Key differences:*
> * initial-delay works without Source support, making it useful for sources
> that don't implement backlog detection
> * initial-delay is a one-time delay, while `interval-during-backlog`
> continuously affects checkpoint scheduling
> * Both can be used together: `initial-delay` for startup warm-up,
> `interval-during-backlog` for ongoing backlog handling
> h1. Proposed Changes
> Add a new configuration in
> ExecutionCheckpointingOptions:execution.checkpointing.initial-delay
--
This message was sent by Atlassian Jira
(v8.20.10#820010)