Liu created FLINK-39267:
---------------------------
Summary: Support dynamically updating checkpoint configuration at
runtime via REST API
Key: FLINK-39267
URL: https://issues.apache.org/jira/browse/FLINK-39267
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing
Reporter: Liu
### Motivation
Currently, checkpoint configuration parameters such as
`execution.checkpointing.timeout` are fixed at job submission time and cannot
be changed without restarting the job. This creates operational pain in several
real-world scenarios:
**Scenario 1: Resolving consecutive checkpoint timeout failures**
When a job experiences consecutive checkpoint timeout failures (e.g., due to
state growth or temporary I/O slowdowns), users often urgently need a
successful checkpoint for:
- Committing offsets in exactly-once sinks (e.g., Kafka transactions)
- Performing job rescaling (which requires a recent checkpoint/savepoint)
- Preventing job failure when
`execution.checkpointing.tolerable-failed-checkpoints` is exceeded
The current workaround — stop-with-savepoint → modify config → restart —
introduces significant downtime and is impractical when the checkpoint itself
is failing.
**Scenario 2: Adapting to changing workload characteristics**
Long-running streaming jobs may experience varying checkpoint durations across
different time periods (e.g., peak vs. off-peak hours, backlog processing vs.
normal processing). A static timeout value forces users to choose between:
- A large timeout that delays failure detection when checkpoints are truly stuck
- A small timeout that causes unnecessary failures during legitimate slow
checkpoints
**Scenario 3: Avoiding wasted checkpoint work (Future Phase)**
When a checkpoint has completed 80%+ of task acknowledgements but is about to
expire, all the snapshot I/O work is wasted. Dynamically extending the timeout
could save significant cluster resources. This is proposed as a future
enhancement (see "Phased Approach" below).
### Phased Approach
This improvement is designed to be implemented incrementally:
**Phase 1 (This Issue): Dynamic checkpoint timeout**
- Change `CheckpointCoordinator.checkpointTimeout` from `final` to `volatile`.
- Add a setter method to update the timeout at runtime.
- Expose a new REST API endpoint for runtime configuration updates.
- The new timeout takes effect for the **next** triggered checkpoint. Already
in-flight checkpoints (whose `CheckpointCanceller` has been scheduled) are not
affected.
- This follows the same design pattern as `setIsProcessingBacklog()`
(FLIP-309), which dynamically switches checkpoint intervals at runtime without
affecting in-flight checkpoints.
**Phase 2 (Follow-up): Additional checkpoint parameters**
- Extend the REST API to support dynamically updating other checkpoint
configuration parameters, such as `checkpointInterval` and
`minPauseBetweenCheckpoints`, following the same pattern.
**Phase 3 (Future): Timeout extension for in-flight checkpoints**
- Reschedule the `CheckpointCanceller` for pending checkpoints when timeout is
updated, so that the new timeout also applies to currently running checkpoints.
- This requires modifying `PendingCheckpoint` to support resetting its
canceller handle (currently `setCancellerHandle()` throws
`IllegalStateException` if called twice).
- Edge cases (new timeout already elapsed, concurrent modifications) need
careful design.
### Public API Changes
**New REST API endpoint:**
```
PATCH /jobs/:jobid/checkpointing/configuration
Request body (Phase 1):
{
"checkpointTimeout": 600000
}
Response: 200 OK
```
The endpoint path `/checkpointing/configuration` is intentionally designed to
be extensible — additional parameters (interval, minPause, etc.) can be added
to the request body in Phase 2 without changing the API contract.
This is consistent with the existing `PUT /jobs/:jobid/resource-requirements`
pattern for runtime job configuration updates.
### Design Details (Phase 1)
- The `checkpointTimeout` field in `CheckpointCoordinator` is read once per
checkpoint in `createPendingCheckpoint()` when scheduling the
`CheckpointCanceller`. Making it `volatile` ensures visibility across threads
with minimal performance impact.
- No changes needed to `PendingCheckpoint`,
`CheckpointCoordinatorConfiguration`, or Task-side code.
- The REST endpoint routes through `RestfulGateway` → `JobMaster` →
`CheckpointCoordinator`.
- Validation: the new timeout must be a positive value.
**Core change in CheckpointCoordinator:**
```java
// Change field from:
private final long checkpointTimeout;
// To:
private volatile long checkpointTimeout;
// New setter method:
public void setCheckpointTimeout(long newTimeout) {
Preconditions.checkArgument(newTimeout > 0,
"Checkpoint timeout must be positive, but was %s", newTimeout);
this.checkpointTimeout = newTimeout;
LOG.info("Checkpoint timeout for job {} updated to {} ms.", job, newTimeout);
}
```
### Compatibility, Deprecation, and Migration Plan
- Fully backward compatible. If the REST API is not called, behavior is
identical to the current implementation.
- No configuration deprecation.
- No changes to existing REST APIs.
### Related Issues / Prior Art
- **FLIP-309** (`setIsProcessingBacklog`) — dynamic checkpoint interval
switching at runtime, same pattern
- **FLIP-160** / `JobResourcesRequirementsUpdateHeaders` — REST API for runtime
job config updates
--
This message was sent by Atlassian Jira
(v8.20.10#820010)