This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c0ada39aafa23e58e6c80883bdb740a89d02c1ff
Author: Roman <[email protected]>
AuthorDate: Sat Jan 24 17:17:36 2026 +0000

    [hotfix][runtime] Try to get last checkpoint on recovery regardless of 
checkpointing interval
    
    The check doesn't make sense because checkpointing might be disabled
    before recovery; or there might be a manual checkpoint.
---
 .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java       | 6 ++----
 .../runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java     | 8 ++++++--
 .../flink/runtime/scheduler/adaptive/LocalRecoveryTest.java       | 5 ++++-
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 52f88b1b66a..8fb59ef1707 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1138,10 +1138,8 @@ public class AdaptiveScheduler
     private JobAllocationsInformation 
getJobAllocationsInformationFromGraphAndState(
             @Nullable final ExecutionGraph previousExecutionGraph) {
 
-        CompletedCheckpoint latestCompletedCheckpoint = null;
-        if (jobGraph.isCheckpointingEnabled()) {
-            latestCompletedCheckpoint = 
completedCheckpointStore.getLatestCheckpoint();
-        }
+        CompletedCheckpoint latestCompletedCheckpoint =
+                completedCheckpointStore.getLatestCheckpoint();
 
         if (previousExecutionGraph == null || latestCompletedCheckpoint == 
null) {
             return JobAllocationsInformation.empty();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java
index 9681a38a3d9..5cbeb5e0151 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java
@@ -88,12 +88,16 @@ public class AdaptiveSchedulerTestBase {
     }
 
     protected static JobGraph createJobGraphWithCheckpointing(final 
JobVertex... jobVertex) {
+        return createJobGraphWithCheckpointing(Duration.ofHours(1).toMillis(), 
jobVertex);
+    }
+
+    protected static JobGraph createJobGraphWithCheckpointing(
+            long checkpointInterval, final JobVertex... jobVertex) {
         final JobGraph jobGraph =
                 JobGraphBuilder.newStreamingJobGraphBuilder()
                         .addJobVertices(Arrays.asList(jobVertex))
                         .build();
-        SchedulerTestingUtils.enableCheckpointing(
-                jobGraph, null, null, Duration.ofHours(1).toMillis(), true);
+        SchedulerTestingUtils.enableCheckpointing(jobGraph, null, null, 
checkpointInterval, true);
         return jobGraph;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
index 3f08094fe50..8a39234c357 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
@@ -66,7 +66,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class LocalRecoveryTest extends AdaptiveSchedulerTestBase {
     @Test
     void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception 
{
-        final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX);
+        final JobGraph jobGraph =
+                createJobGraphWithCheckpointing(
+                        // disable automatic checkpointing to avoid races with 
manual checkpoints
+                        Long.MAX_VALUE, JOB_VERTEX);
         final DeclarativeSlotPool slotPool = 
getSlotPoolWithFreeSlots(PARALLELISM);
         final List<JobAllocationsInformation> capturedAllocations = new 
ArrayList<>();
         final boolean localRecoveryEnabled = true;

Reply via email to