This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c0ada39aafa23e58e6c80883bdb740a89d02c1ff Author: Roman <[email protected]> AuthorDate: Sat Jan 24 17:17:36 2026 +0000 [hotfix][runtime] Try to get last checkpoint on recovery regardless of checkpointing interval The check doesn't make sense because checkpointing might be disabled before recovery; or there might be a manual checkpoint. --- .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java | 6 ++---- .../runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java | 8 ++++++-- .../flink/runtime/scheduler/adaptive/LocalRecoveryTest.java | 5 ++++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 52f88b1b66a..8fb59ef1707 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1138,10 +1138,8 @@ public class AdaptiveScheduler private JobAllocationsInformation getJobAllocationsInformationFromGraphAndState( @Nullable final ExecutionGraph previousExecutionGraph) { - CompletedCheckpoint latestCompletedCheckpoint = null; - if (jobGraph.isCheckpointingEnabled()) { - latestCompletedCheckpoint = completedCheckpointStore.getLatestCheckpoint(); - } + CompletedCheckpoint latestCompletedCheckpoint = + completedCheckpointStore.getLatestCheckpoint(); if (previousExecutionGraph == null || latestCompletedCheckpoint == null) { return JobAllocationsInformation.empty(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java index 9681a38a3d9..5cbeb5e0151 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java @@ -88,12 +88,16 @@ public class AdaptiveSchedulerTestBase { } protected static JobGraph createJobGraphWithCheckpointing(final JobVertex... jobVertex) { + return createJobGraphWithCheckpointing(Duration.ofHours(1).toMillis(), jobVertex); + } + + protected static JobGraph createJobGraphWithCheckpointing( + long checkpointInterval, final JobVertex... jobVertex) { final JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder() .addJobVertices(Arrays.asList(jobVertex)) .build(); - SchedulerTestingUtils.enableCheckpointing( - jobGraph, null, null, Duration.ofHours(1).toMillis(), true); + SchedulerTestingUtils.enableCheckpointing(jobGraph, null, null, checkpointInterval, true); return jobGraph; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java index 3f08094fe50..8a39234c357 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java @@ -66,7 +66,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class LocalRecoveryTest extends AdaptiveSchedulerTestBase { @Test void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { - final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX); + final JobGraph jobGraph = + createJobGraphWithCheckpointing( + // disable automatic checkpointing to avoid races with manual checkpoints + Long.MAX_VALUE, JOB_VERTEX); final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); final boolean localRecoveryEnabled = true;
