[
https://issues.apache.org/jira/browse/FLINK-39267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18067926#comment-18067926
]
Zakelly Lan commented on FLINK-39267:
-------------------------------------
+1 for this. We should be able to adjust the cp interval via api.
> Support dynamically updating checkpoint configuration at runtime via REST API
> -----------------------------------------------------------------------------
>
> Key: FLINK-39267
> URL: https://issues.apache.org/jira/browse/FLINK-39267
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Reporter: Liu
> Priority: Major
>
> h2. Motivation
> Currently, checkpoint configuration parameters such as
> {{execution.checkpointing.timeout}} are fixed at job submission time and
> cannot be changed without restarting the job. This creates operational pain
> in several real-world scenarios:
> *Scenario 1: Resolving consecutive checkpoint timeout failures*
> When a job experiences consecutive checkpoint timeout failures (e.g., due to
> state growth or temporary I/O slowdowns), users often urgently need a
> successful checkpoint for:
> * Committing offsets in exactly-once sinks (e.g., Kafka transactions)
> * Performing job rescaling (which requires a recent checkpoint/savepoint)
> * Preventing job failure when
> {{execution.checkpointing.tolerable-failed-checkpoints}} is exceeded
> The current workaround — stop-with-savepoint -> modify config -> restart —
> introduces significant downtime and is impractical when the checkpoint itself
> is failing.
> *Scenario 2: Adapting to changing workload characteristics*
> Long-running streaming jobs may experience varying checkpoint durations
> across different time periods (e.g., peak vs. off-peak hours, backlog
> processing vs. normal processing). A static timeout value forces users to
> choose between:
> * A large timeout that delays failure detection when checkpoints are truly
> stuck
> * A small timeout that causes unnecessary failures during legitimate slow
> checkpoints
> *Scenario 3: Avoiding wasted checkpoint work (Future Phase)*
> When a checkpoint has completed 80%+ of task acknowledgements but is about to
> expire, all the snapshot I/O work is wasted. Dynamically extending the
> timeout could save significant cluster resources. This is proposed as a
> future enhancement (see "Phased Approach" below).
> h2. Phased Approach
> This improvement is designed to be implemented incrementally:
> *Phase 1 (This Issue): Dynamic checkpoint timeout*
> * Change {{CheckpointCoordinator.checkpointTimeout}} from {{final}} to
> {{{}volatile{}}}.
> * Add a setter method to update the timeout at runtime.
> * Expose a new REST API endpoint for runtime configuration updates.
> * The new timeout takes effect for the next triggered checkpoint. Already
> in-flight checkpoints (whose {{CheckpointCanceller}} has been scheduled) are
> not affected.
> * This follows the same design pattern as {{setIsProcessingBacklog()}}
> (FLIP-309), which dynamically switches checkpoint intervals at runtime
> without affecting in-flight checkpoints.
> *Phase 2 (Follow-up): Additional checkpoint parameters*
> * Extend the REST API to support dynamically updating other checkpoint
> configuration parameters, such as {{checkpointInterval}} and
> {{{}minPauseBetweenCheckpoints{}}}, following the same pattern.
> *Phase 3 (Future): Timeout extension for in-flight checkpoints*
> * Reschedule the {{CheckpointCanceller}} for pending checkpoints when
> timeout is updated, so that the new timeout also applies to currently running
> checkpoints.
> * This requires modifying {{PendingCheckpoint}} to support resetting its
> canceller handle (currently {{setCancellerHandle()}} throws
> {{IllegalStateException}} if called twice).
> * Edge cases (new timeout already elapsed, concurrent modifications) need
> careful design.
> h2. Public API Changes
> New REST API endpoint:
> {code:java|title=Endpoint Definition}
> PATCH /jobs/:jobid/checkpointing/configuration
> Request body (Phase 1):
> {
> "checkpointTimeout": 600000
> }
> Response: 200 OK
> {code}
> The endpoint path {{/checkpointing/configuration}} is intentionally designed
> to be extensible — additional parameters (interval, minPause, etc.) can be
> added to the request body in Phase 2 without changing the API contract.
> This is consistent with the existing {{PUT
> /jobs/:jobid/resource-requirements}} pattern for runtime job configuration
> updates.
> h2. Design Details (Phase 1)
> * The {{checkpointTimeout}} field in {{CheckpointCoordinator}} is read once
> per checkpoint in {{createPendingCheckpoint()}} when scheduling the
> {{{}CheckpointCanceller{}}}. Making it {{volatile}} ensures visibility across
> threads with minimal performance impact.
> * No changes needed to {{{}PendingCheckpoint{}}},
> {{{}CheckpointCoordinatorConfiguration{}}}, or Task-side code.
> * The REST endpoint routes through {{RestfulGateway}} -> {{JobMaster}} ->
> {{{}CheckpointCoordinator{}}}.
> * Validation: the new timeout must be a positive value.
> Core change in CheckpointCoordinator:
> {code:java}
> // Change field from:
> private final long checkpointTimeout;
> // To:
> private volatile long checkpointTimeout;
> // New setter method:
> public void setCheckpointTimeout(long newTimeout) {
> Preconditions.checkArgument(newTimeout > 0,
> "Checkpoint timeout must be positive, but was %s", newTimeout);
> this.checkpointTimeout = newTimeout;
> LOG.info("Checkpoint timeout for job {} updated to {} ms.", job, newTimeout);
> }
> {code}
> h2. Compatibility, Deprecation, and Migration Plan
> Fully backward compatible. If the REST API is not called, behavior is
> identical to the current implementation.
> No configuration deprecation.
> No changes to existing REST APIs.
> h2. Related Issues / Prior Art
> FLIP-309 ({{{}setIsProcessingBacklog{}}}) — dynamic checkpoint interval
> switching at runtime, same pattern
> FLIP-160 / {{JobResourcesRequirementsUpdateHeaders}} — REST API for runtime
> job config updates
--
This message was sent by Atlassian Jira
(v8.20.10#820010)