[ 
https://issues.apache.org/jira/browse/FLINK-39267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18067926#comment-18067926
 ] 

Zakelly Lan commented on FLINK-39267:
-------------------------------------

+1 for this. We should be able to adjust the cp interval via api.

> Support dynamically updating checkpoint configuration at runtime via REST API
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-39267
>                 URL: https://issues.apache.org/jira/browse/FLINK-39267
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Liu
>            Priority: Major
>
> h2. Motivation
> Currently, checkpoint configuration parameters such as 
> {{execution.checkpointing.timeout}} are fixed at job submission time and 
> cannot be changed without restarting the job. This creates operational pain 
> in several real-world scenarios:
> *Scenario 1: Resolving consecutive checkpoint timeout failures*
> When a job experiences consecutive checkpoint timeout failures (e.g., due to 
> state growth or temporary I/O slowdowns), users often urgently need a 
> successful checkpoint for:
>  * Committing offsets in exactly-once sinks (e.g., Kafka transactions)
>  * Performing job rescaling (which requires a recent checkpoint/savepoint)
>  * Preventing job failure when 
> {{execution.checkpointing.tolerable-failed-checkpoints}} is exceeded
> The current workaround — stop-with-savepoint -> modify config -> restart — 
> introduces significant downtime and is impractical when the checkpoint itself 
> is failing.
> *Scenario 2: Adapting to changing workload characteristics*
> Long-running streaming jobs may experience varying checkpoint durations 
> across different time periods (e.g., peak vs. off-peak hours, backlog 
> processing vs. normal processing). A static timeout value forces users to 
> choose between:
>  * A large timeout that delays failure detection when checkpoints are truly 
> stuck
>  * A small timeout that causes unnecessary failures during legitimate slow 
> checkpoints
> *Scenario 3: Avoiding wasted checkpoint work (Future Phase)*
> When a checkpoint has completed 80%+ of task acknowledgements but is about to 
> expire, all the snapshot I/O work is wasted. Dynamically extending the 
> timeout could save significant cluster resources. This is proposed as a 
> future enhancement (see "Phased Approach" below).
> h2. Phased Approach
> This improvement is designed to be implemented incrementally:
> *Phase 1 (This Issue): Dynamic checkpoint timeout*
>  * Change {{CheckpointCoordinator.checkpointTimeout}} from {{final}} to 
> {{{}volatile{}}}.
>  * Add a setter method to update the timeout at runtime.
>  * Expose a new REST API endpoint for runtime configuration updates.
>  * The new timeout takes effect for the next triggered checkpoint. Already 
> in-flight checkpoints (whose {{CheckpointCanceller}} has been scheduled) are 
> not affected.
>  * This follows the same design pattern as {{setIsProcessingBacklog()}} 
> (FLIP-309), which dynamically switches checkpoint intervals at runtime 
> without affecting in-flight checkpoints.
> *Phase 2 (Follow-up): Additional checkpoint parameters*
>  * Extend the REST API to support dynamically updating other checkpoint 
> configuration parameters, such as {{checkpointInterval}} and 
> {{{}minPauseBetweenCheckpoints{}}}, following the same pattern.
> *Phase 3 (Future): Timeout extension for in-flight checkpoints*
>  * Reschedule the {{CheckpointCanceller}} for pending checkpoints when 
> timeout is updated, so that the new timeout also applies to currently running 
> checkpoints.
>  * This requires modifying {{PendingCheckpoint}} to support resetting its 
> canceller handle (currently {{setCancellerHandle()}} throws 
> {{IllegalStateException}} if called twice).
>  * Edge cases (new timeout already elapsed, concurrent modifications) need 
> careful design.
> h2. Public API Changes
> New REST API endpoint:
> {code:java|title=Endpoint Definition}
> PATCH /jobs/:jobid/checkpointing/configuration
> Request body (Phase 1):
> {
> "checkpointTimeout": 600000
> }
> Response: 200 OK
> {code}
> The endpoint path {{/checkpointing/configuration}} is intentionally designed 
> to be extensible — additional parameters (interval, minPause, etc.) can be 
> added to the request body in Phase 2 without changing the API contract.
> This is consistent with the existing {{PUT 
> /jobs/:jobid/resource-requirements}} pattern for runtime job configuration 
> updates.
> h2. Design Details (Phase 1)
>  * The {{checkpointTimeout}} field in {{CheckpointCoordinator}} is read once 
> per checkpoint in {{createPendingCheckpoint()}} when scheduling the 
> {{{}CheckpointCanceller{}}}. Making it {{volatile}} ensures visibility across 
> threads with minimal performance impact.
>  * No changes needed to {{{}PendingCheckpoint{}}}, 
> {{{}CheckpointCoordinatorConfiguration{}}}, or Task-side code.
>  * The REST endpoint routes through {{RestfulGateway}} -> {{JobMaster}} -> 
> {{{}CheckpointCoordinator{}}}.
>  * Validation: the new timeout must be a positive value.
> Core change in CheckpointCoordinator:
> {code:java}
> // Change field from:
> private final long checkpointTimeout;
> // To:
> private volatile long checkpointTimeout;
> // New setter method:
> public void setCheckpointTimeout(long newTimeout) {
> Preconditions.checkArgument(newTimeout > 0,
> "Checkpoint timeout must be positive, but was %s", newTimeout);
> this.checkpointTimeout = newTimeout;
> LOG.info("Checkpoint timeout for job {} updated to {} ms.", job, newTimeout);
> }
> {code}
> h2. Compatibility, Deprecation, and Migration Plan
> Fully backward compatible. If the REST API is not called, behavior is 
> identical to the current implementation.
> No configuration deprecation.
> No changes to existing REST APIs.
> h2. Related Issues / Prior Art
> FLIP-309 ({{{}setIsProcessingBacklog{}}}) — dynamic checkpoint interval 
> switching at runtime, same pattern
> FLIP-160 / {{JobResourcesRequirementsUpdateHeaders}} — REST API for runtime 
> job config updates



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to