This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b9b4e53a59 [FLINK-32469][REST] Improve checkpoint REST APIs for 
programmatic access
7b9b4e53a59 is described below

commit 7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf
Author: Hong Liang Teoh <[email protected]>
AuthorDate: Thu Jun 29 08:32:57 2023 +0100

    [FLINK-32469][REST] Improve checkpoint REST APIs for programmatic access
---
 .../shortcodes/generated/expert_rest_section.html  |  12 +
 .../shortcodes/generated/rest_configuration.html   |  12 +
 .../apache/flink/configuration/RestOptions.java    |  24 ++
 .../flink/runtime/dispatcher/Dispatcher.java       |   8 +
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   6 +
 .../flink/runtime/jobmaster/JobMasterGateway.java  |   9 +
 .../rest/handler/RestHandlerConfiguration.java     |  40 ++-
 .../job/checkpoints/AbstractCheckpointHandler.java |  24 +-
 .../AbstractCheckpointStatsHandler.java            | 119 ++++++++
 .../CheckpointStatisticDetailsHandler.java         |   9 +-
 .../CheckpointingStatisticsHandler.java            |  25 +-
 .../TaskCheckpointStatisticDetailsHandler.java     |   9 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |   7 +
 .../flink/runtime/scheduler/SchedulerNG.java       |  10 +
 .../scheduler/adaptive/AdaptiveScheduler.java      |   6 +
 .../NonLeaderRetrievalRestfulGateway.java          |   7 +
 .../flink/runtime/webmonitor/RestfulGateway.java   |  11 +
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  19 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  67 +++++
 .../flink/runtime/jobmaster/JobMasterTest.java     |  37 +++
 .../runtime/jobmaster/TestingJobManagerRunner.java |   2 +-
 .../jobmaster/utils/TestingJobMasterGateway.java   |  14 +
 .../utils/TestingJobMasterGatewayBuilder.java      |  10 +
 .../rest/handler/RestHandlerConfigurationTest.java |  38 +++
 .../AbstractCheckpointStatsHandlerTest.java        | 305 +++++++++++++++++++++
 .../runtime/scheduler/TestingSchedulerNG.java      |   7 +
 .../webmonitor/TestingDispatcherGateway.java       |   5 +
 .../runtime/webmonitor/TestingRestfulGateway.java  |  29 ++
 28 files changed, 828 insertions(+), 43 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/expert_rest_section.html 
b/docs/layouts/shortcodes/generated/expert_rest_section.html
index 733f7cd0a88..6c0d5e1a643 100644
--- a/docs/layouts/shortcodes/generated/expert_rest_section.html
+++ b/docs/layouts/shortcodes/generated/expert_rest_section.html
@@ -20,6 +20,18 @@
             <td>Long</td>
             <td>The time in ms that the client waits for the leader address, 
e.g., Dispatcher or WebMonitorEndpoint</td>
         </tr>
+        <tr>
+            <td><h5>rest.cache.checkpoint-statistics.size</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>Maximum number of entries in the checkpoint statistics 
cache.</td>
+        </tr>
+        <tr>
+            <td><h5>rest.cache.checkpoint-statistics.timeout</h5></td>
+            <td style="word-wrap: break-word;">3 s</td>
+            <td>Duration</td>
+            <td>Duration from write after which cached checkpoints statistics 
are cleaned up. For backwards compatibility, if no value is configured, <code 
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
+        </tr>
         <tr>
             <td><h5>rest.client.max-content-length</h5></td>
             <td style="word-wrap: break-word;">104857600</td>
diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html 
b/docs/layouts/shortcodes/generated/rest_configuration.html
index f22f95c7488..467b4275acd 100644
--- a/docs/layouts/shortcodes/generated/rest_configuration.html
+++ b/docs/layouts/shortcodes/generated/rest_configuration.html
@@ -38,6 +38,18 @@
             <td>String</td>
             <td>The port that the server binds itself. Accepts a list of ports 
(“50100,50101”), ranges (“50100-50200”) or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple Rest 
servers are running on the same machine.</td>
         </tr>
+        <tr>
+            <td><h5>rest.cache.checkpoint-statistics.size</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>Maximum number of entries in the checkpoint statistics 
cache.</td>
+        </tr>
+        <tr>
+            <td><h5>rest.cache.checkpoint-statistics.timeout</h5></td>
+            <td style="word-wrap: break-word;">3 s</td>
+            <td>Duration</td>
+            <td>Duration from write after which cached checkpoints statistics 
are cleaned up. For backwards compatibility, if no value is configured, <code 
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
+        </tr>
         <tr>
             <td><h5>rest.client.max-content-length</h5></td>
             <td style="word-wrap: break-word;">104857600</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 165be2f3415..d3c1a6baa90 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.description.Description;
 import java.time.Duration;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
 
 /** Configuration parameters for REST communication. */
@@ -193,6 +194,29 @@ public class RestOptions {
                                     + "Lowering the thread priority will give 
Flink's main components more CPU time whereas "
                                     + "increasing will allocate more time for 
the REST server's processing.");
 
+    /** Duration from write, after which cached checkpoints statistics are 
cleaned up. */
+    @Documentation.Section(Documentation.Sections.EXPERT_REST)
+    public static final ConfigOption<Duration> 
CACHE_CHECKPOINT_STATISTICS_TIMEOUT =
+            key("rest.cache.checkpoint-statistics.timeout")
+                    .durationType()
+                    
.defaultValue(Duration.ofMillis(WebOptions.REFRESH_INTERVAL.defaultValue()))
+                    .withFallbackKeys(WebOptions.REFRESH_INTERVAL.key())
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Duration from write after which 
cached checkpoints statistics are cleaned up. For backwards compatibility, if 
no value is configured, %s will be used instead.",
+                                            
code(WebOptions.REFRESH_INTERVAL.key()))
+                                    .build());
+
+    /** Maximum number of entries in the checkpoint statistics cache. */
+    @Documentation.Section(Documentation.Sections.EXPERT_REST)
+    public static final ConfigOption<Integer> CACHE_CHECKPOINT_STATISTICS_SIZE 
=
+            key("rest.cache.checkpoint-statistics.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Maximum number of entries in the checkpoint 
statistics cache.");
+
     /** Enables the experimental flame graph feature. */
     @Documentation.Section(Documentation.Sections.EXPERT_REST)
     public static final ConfigOption<Boolean> ENABLE_FLAMEGRAPH =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index d7d23256cbc..d2932562423 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
@@ -937,6 +938,13 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                 .exceptionally(checkExecutionGraphStoreOnException);
     }
 
+    @Override
+    public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+            JobID jobId, Time timeout) {
+        return performOperationOnJobMasterGateway(
+                jobId, gateway -> gateway.requestCheckpointStats(timeout));
+    }
+
     @Override
     public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time 
timeout) {
         if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6980824517a..b80cab39abf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.blocklist.BlocklistContext;
 import org.apache.flink.runtime.blocklist.BlocklistHandler;
 import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -868,6 +869,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         return CompletableFuture.completedFuture(schedulerNG.requestJob());
     }
 
+    @Override
+    public CompletableFuture<CheckpointStatsSnapshot> 
requestCheckpointStats(Time timeout) {
+        return 
CompletableFuture.completedFuture(schedulerNG.requestCheckpointStats());
+    }
+
     @Override
     public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
             final CheckpointType checkpointType, final Time timeout) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e7becbe5189..6c1b79568a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.blocklist.BlocklistListener;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -206,6 +207,14 @@ public interface JobMasterGateway
      */
     CompletableFuture<ExecutionGraphInfo> requestJob(@RpcTimeout Time timeout);
 
+    /**
+     * Requests the {@link CheckpointStatsSnapshot} of the job.
+     *
+     * @param timeout for the rpc call
+     * @return Future which is completed with the {@link 
CheckpointStatsSnapshot} of the job
+     */
+    CompletableFuture<CheckpointStatsSnapshot> 
requestCheckpointStats(@RpcTimeout Time timeout);
+
     /**
      * Triggers taking a savepoint of the executed job.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 9c8de50b21c..c60c6bfe7fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -21,17 +21,23 @@ package org.apache.flink.runtime.rest.handler;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.File;
+import java.time.Duration;
 
 /** Configuration object containing values for the rest handler configuration. 
*/
 public class RestHandlerConfiguration {
 
     private final long refreshInterval;
 
-    private final int maxCheckpointStatisticCacheEntries;
+    private final int checkpointHistorySize;
+
+    private final Duration checkpointCacheExpireAfterWrite;
+
+    private final int checkpointCacheSize;
 
     private final Time timeout;
 
@@ -45,7 +51,9 @@ public class RestHandlerConfiguration {
 
     public RestHandlerConfiguration(
             long refreshInterval,
-            int maxCheckpointStatisticCacheEntries,
+            int checkpointHistorySize,
+            Duration checkpointCacheExpireAfterWrite,
+            int checkpointCacheSize,
             Time timeout,
             File webUiDir,
             boolean webSubmitEnabled,
@@ -55,7 +63,9 @@ public class RestHandlerConfiguration {
                 refreshInterval > 0L, "The refresh interval (ms) should be 
larger than 0.");
         this.refreshInterval = refreshInterval;
 
-        this.maxCheckpointStatisticCacheEntries = 
maxCheckpointStatisticCacheEntries;
+        this.checkpointHistorySize = checkpointHistorySize;
+        this.checkpointCacheExpireAfterWrite = checkpointCacheExpireAfterWrite;
+        this.checkpointCacheSize = checkpointCacheSize;
 
         this.timeout = Preconditions.checkNotNull(timeout);
         this.webUiDir = Preconditions.checkNotNull(webUiDir);
@@ -68,8 +78,16 @@ public class RestHandlerConfiguration {
         return refreshInterval;
     }
 
-    public int getMaxCheckpointStatisticCacheEntries() {
-        return maxCheckpointStatisticCacheEntries;
+    public int getCheckpointHistorySize() {
+        return checkpointHistorySize;
+    }
+
+    public Duration getCheckpointCacheExpireAfterWrite() {
+        return checkpointCacheExpireAfterWrite;
+    }
+
+    public int getCheckpointCacheSize() {
+        return checkpointCacheSize;
     }
 
     public Time getTimeout() {
@@ -95,8 +113,14 @@ public class RestHandlerConfiguration {
     public static RestHandlerConfiguration fromConfiguration(Configuration 
configuration) {
         final long refreshInterval = 
configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
-        final int maxCheckpointStatisticCacheEntries =
+        final int checkpointHistorySize =
                 configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
+        final Duration checkpointStatsSnapshotCacheExpireAfterWrite =
+                configuration
+                        
.getOptional(RestOptions.CACHE_CHECKPOINT_STATISTICS_TIMEOUT)
+                        .orElse(Duration.ofMillis(refreshInterval));
+        final int checkpointStatsSnapshotCacheSize =
+                
configuration.get(RestOptions.CACHE_CHECKPOINT_STATISTICS_SIZE);
 
         final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
 
@@ -113,7 +137,9 @@ public class RestHandlerConfiguration {
 
         return new RestHandlerConfiguration(
                 refreshInterval,
-                maxCheckpointStatisticCacheEntries,
+                checkpointHistorySize,
+                checkpointStatsSnapshotCacheExpireAfterWrite,
+                checkpointStatsSnapshotCacheSize,
                 timeout,
                 webUiDir,
                 webSubmitEnabled,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
index 61a4480d64b..b8f86a9f885 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import 
org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
@@ -35,9 +34,11 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
@@ -47,7 +48,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractCheckpointHandler<
                 R extends ResponseBody, M extends CheckpointMessageParameters>
-        extends AbstractAccessExecutionGraphHandler<R, M> {
+        extends AbstractCheckpointStatsHandler<R, M> {
 
     private final CheckpointStatsCache checkpointStatsCache;
 
@@ -56,29 +57,28 @@ public abstract class AbstractCheckpointHandler<
             Time timeout,
             Map<String, String> responseHeaders,
             MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
-            ExecutionGraphCache executionGraphCache,
             Executor executor,
+            Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> 
checkpointStatsSnapshotCache,
             CheckpointStatsCache checkpointStatsCache) {
         super(
                 leaderRetriever,
                 timeout,
                 responseHeaders,
                 messageHeaders,
-                executionGraphCache,
+                checkpointStatsSnapshotCache,
                 executor);
 
         this.checkpointStatsCache = 
Preconditions.checkNotNull(checkpointStatsCache);
     }
 
     @Override
-    protected R handleRequest(
-            HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph 
executionGraph)
+    protected R handleCheckpointStatsRequest(
+            HandlerRequest<EmptyRequestBody> request,
+            CheckpointStatsSnapshot checkpointStatsSnapshot)
             throws RestHandlerException {
+        JobID jobId = request.getPathParameter(JobIDPathParameter.class);
         final long checkpointId = 
request.getPathParameter(CheckpointIdPathParameter.class);
 
-        final CheckpointStatsSnapshot checkpointStatsSnapshot =
-                executionGraph.getCheckpointStatsSnapshot();
-
         if (checkpointStatsSnapshot != null) {
             AbstractCheckpointStats checkpointStats =
                     
checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId);
@@ -100,7 +100,7 @@ public abstract class AbstractCheckpointHandler<
             }
         } else {
             throw new RestHandlerException(
-                    "Checkpointing was not enabled for job " + 
executionGraph.getJobID() + '.',
+                    "Checkpointing was not enabled for job " + jobId + '.',
                     HttpResponseStatus.NOT_FOUND);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandler.java
new file mode 100644
index 00000000000..60d009133f5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract class for checkpoint handlers that will cache the {@link 
CheckpointStatsSnapshot}
+ * object.
+ *
+ * @param <R> the response type
+ * @param <M> the message parameters
+ */
+@Internal
+public abstract class AbstractCheckpointStatsHandler<
+                R extends ResponseBody, M extends JobMessageParameters>
+        extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, M> {
+
+    private final Executor executor;
+    private final Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+            checkpointStatsSnapshotCache;
+
+    protected AbstractCheckpointStatsHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
+            Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> 
checkpointStatsSnapshotCache,
+            Executor executor) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+        this.executor = executor;
+        this.checkpointStatsSnapshotCache = checkpointStatsSnapshotCache;
+    }
+
+    @Override
+    protected CompletableFuture<R> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull 
RestfulGateway gateway)
+            throws RestHandlerException {
+        JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+
+        try {
+            return checkpointStatsSnapshotCache
+                    .get(jobId, () -> gateway.requestCheckpointStats(jobId, 
timeout))
+                    .thenApplyAsync(
+                            checkpointStatsSnapshot -> {
+                                try {
+                                    return handleCheckpointStatsRequest(
+                                            request, checkpointStatsSnapshot);
+                                } catch (RestHandlerException e) {
+                                    throw new CompletionException(e);
+                                }
+                            },
+                            executor)
+                    .exceptionally(
+                            throwable -> {
+                                throwable = 
ExceptionUtils.stripCompletionException(throwable);
+                                if (throwable instanceof 
FlinkJobNotFoundException) {
+                                    throw new CompletionException(
+                                            new NotFoundException(
+                                                    String.format("Job %s not 
found", jobId),
+                                                    throwable));
+                                } else {
+                                    throw new CompletionException(throwable);
+                                }
+                            });
+        } catch (ExecutionException e) {
+            CompletableFuture<R> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+    }
+
+    protected abstract R handleCheckpointStatsRequest(
+            HandlerRequest<EmptyRequestBody> request,
+            CheckpointStatsSnapshot checkpointStatsSnapshot)
+            throws RestHandlerException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
index cea6da26eb5..ac41bc71fe3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -37,12 +37,15 @@ import 
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import 
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /** REST handler which returns the details for a checkpoint. */
@@ -56,16 +59,16 @@ public class CheckpointStatisticDetailsHandler
             Map<String, String> responseHeaders,
             MessageHeaders<EmptyRequestBody, CheckpointStatistics, 
CheckpointMessageParameters>
                     messageHeaders,
-            ExecutionGraphCache executionGraphCache,
             Executor executor,
+            Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> 
checkpointStatsSnapshotCache,
             CheckpointStatsCache checkpointStatsCache) {
         super(
                 leaderRetriever,
                 timeout,
                 responseHeaders,
                 messageHeaders,
-                executionGraphCache,
                 executor,
+                checkpointStatsSnapshotCache,
                 checkpointStatsCache);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index 7bc4c9214c1..6accdd40983 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
@@ -28,8 +29,6 @@ import 
org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import 
org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
@@ -44,6 +43,7 @@ import 
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import 
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.io.IOException;
@@ -52,11 +52,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /** Handler which serves the checkpoint statistics. */
 public class CheckpointingStatisticsHandler
-        extends AbstractAccessExecutionGraphHandler<CheckpointingStatistics, 
JobMessageParameters>
+        extends AbstractCheckpointStatsHandler<CheckpointingStatistics, 
JobMessageParameters>
         implements OnlyExecutionGraphJsonArchivist {
 
     public CheckpointingStatisticsHandler(
@@ -65,22 +66,23 @@ public class CheckpointingStatisticsHandler
             Map<String, String> responseHeaders,
             MessageHeaders<EmptyRequestBody, CheckpointingStatistics, 
JobMessageParameters>
                     messageHeaders,
-            ExecutionGraphCache executionGraphCache,
+            Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> 
checkpointStatsSnapshotCache,
             Executor executor) {
         super(
                 leaderRetriever,
                 timeout,
                 responseHeaders,
                 messageHeaders,
-                executionGraphCache,
+                checkpointStatsSnapshotCache,
                 executor);
     }
 
     @Override
-    protected CheckpointingStatistics handleRequest(
-            HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph 
executionGraph)
+    protected CheckpointingStatistics handleCheckpointStatsRequest(
+            HandlerRequest<EmptyRequestBody> request,
+            CheckpointStatsSnapshot checkpointStatsSnapshot)
             throws RestHandlerException {
-        return createCheckpointingStatistics(executionGraph);
+        return createCheckpointingStatistics(checkpointStatsSnapshot);
     }
 
     @Override
@@ -88,7 +90,7 @@ public class CheckpointingStatisticsHandler
             throws IOException {
         ResponseBody json;
         try {
-            json = createCheckpointingStatistics(graph);
+            json = 
createCheckpointingStatistics(graph.getCheckpointStatsSnapshot());
         } catch (RestHandlerException rhe) {
             json = new ErrorResponseBody(rhe.getMessage());
         }
@@ -100,10 +102,7 @@ public class CheckpointingStatisticsHandler
     }
 
     private static CheckpointingStatistics createCheckpointingStatistics(
-            AccessExecutionGraph executionGraph) throws RestHandlerException {
-        final CheckpointStatsSnapshot checkpointStatsSnapshot =
-                executionGraph.getCheckpointStatsSnapshot();
-
+            CheckpointStatsSnapshot checkpointStatsSnapshot) throws 
RestHandlerException {
         if (checkpointStatsSnapshot == null) {
             throw new RestHandlerException(
                     "Checkpointing has not been enabled.",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
index 3f5e0f16c9f..1f0b7e5ed1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -30,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -46,12 +46,15 @@ import 
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import 
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /** REST handler which serves checkpoint statistics for subtasks. */
@@ -69,16 +72,16 @@ public class TaskCheckpointStatisticDetailsHandler
                             TaskCheckpointStatisticsWithSubtaskDetails,
                             TaskCheckpointMessageParameters>
                     messageHeaders,
-            ExecutionGraphCache executionGraphCache,
             Executor executor,
+            Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> 
checkpointStatsSnapshotCache,
             CheckpointStatsCache checkpointStatsCache) {
         super(
                 leaderRetriever,
                 timeout,
                 responseHeaders,
                 messageHeaders,
-                executionGraphCache,
                 executor,
+                checkpointStatsSnapshotCache,
                 checkpointStatsCache);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index bacb0fa7718..b64bcd9e44b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -799,6 +800,12 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
                 ArchivedExecutionGraph.createFrom(executionGraph), 
getExceptionHistory());
     }
 
+    @Override
+    public CheckpointStatsSnapshot requestCheckpointStats() {
+        mainThreadExecutor.assertRunningInMainThread();
+        return executionGraph.getCheckpointStatsSnapshot();
+    }
+
     @Override
     public JobStatus requestJobStatus() {
         return executionGraph.getState();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 0da92a805cf..7887dc8abba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -93,6 +94,15 @@ public interface SchedulerNG extends GlobalFailureHandler, 
AutoCloseableAsync {
 
     ExecutionGraphInfo requestJob();
 
+    /**
+     * Returns the checkpoint statistics for a given job. Although the {@link
+     * CheckpointStatsSnapshot} is included in the {@link ExecutionGraphInfo}, 
this method is
+     * preferred to {@link SchedulerNG#requestJob()} because it is less 
expensive.
+     *
+     * @return checkpoint statistics snapshot for job graph
+     */
+    CheckpointStatsSnapshot requestCheckpointStats();
+
     JobStatus requestJobStatus();
 
     JobDetails requestJobDetails();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index ddd6fd5ea95..d4b9d65d403 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -593,6 +594,11 @@ public class AdaptiveScheduler
         return new ExecutionGraphInfo(state.getJob(), 
exceptionHistory.toArrayList());
     }
 
+    @Override
+    public CheckpointStatsSnapshot requestCheckpointStats() {
+        return state.getJob().getCheckpointStatsSnapshot();
+    }
+
     @Override
     public void archiveFailure(RootExceptionHistoryEntry failure) {
         exceptionHistory.add(failure);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
index d0be629e0e4..28cc793c1c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -69,6 +70,12 @@ public class NonLeaderRetrievalRestfulGateway implements 
RestfulGateway {
         throw new UnsupportedOperationException(MESSAGE);
     }
 
+    @Override
+    public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+            JobID jobId, Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
     @Override
     public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time 
timeout) {
         throw new UnsupportedOperationException(MESSAGE);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 2d3b5363ca9..7aa246bd705 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
@@ -95,6 +96,16 @@ public interface RestfulGateway extends RpcGateway {
     CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
             JobID jobId, @RpcTimeout Time timeout);
 
+    /**
+     * Requests the {@link CheckpointStatsSnapshot} containing checkpointing 
information.
+     *
+     * @param jobId identifying the job whose {@link CheckpointStatsSnapshot} 
is requested
+     * @param timeout for the asynchronous operation
+     * @return Future containing the {@link CheckpointStatsSnapshot} for the 
given jobId
+     */
+    CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+            JobID jobId, @RpcTimeout Time timeout);
+
     /**
      * Requests the {@link JobResult} of a job specified by the given jobId.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index c653866b28b..7153f320ef9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.AkkaOptions;
@@ -25,6 +26,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElection;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -163,6 +165,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import javax.annotation.Nullable;
@@ -199,6 +203,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
 
     private final ExecutionGraphCache executionGraphCache;
     private final CheckpointStatsCache checkpointStatsCache;
+    private final Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+            checkpointStatsSnapshotCache;
 
     private final MetricFetcher metricFetcher;
 
@@ -235,7 +241,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
         this.executionGraphCache = executionGraphCache;
 
         this.checkpointStatsCache =
-                new 
CheckpointStatsCache(restConfiguration.getMaxCheckpointStatisticCacheEntries());
+                new 
CheckpointStatsCache(restConfiguration.getCheckpointHistorySize());
+        this.checkpointStatsSnapshotCache =
+                CacheBuilder.newBuilder()
+                        
.maximumSize(restConfiguration.getCheckpointCacheSize())
+                        
.expireAfterWrite(restConfiguration.getCheckpointCacheExpireAfterWrite())
+                        .build();
 
         this.metricFetcher = metricFetcher;
 
@@ -372,7 +383,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         timeout,
                         responseHeaders,
                         CheckpointingStatisticsHeaders.getInstance(),
-                        executionGraphCache,
+                        checkpointStatsSnapshotCache,
                         executor);
 
         CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler =
@@ -381,8 +392,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         timeout,
                         responseHeaders,
                         CheckpointStatisticDetailsHeaders.getInstance(),
-                        executionGraphCache,
                         executor,
+                        checkpointStatsSnapshotCache,
                         checkpointStatsCache);
 
         JobPlanHandler jobPlanHandler =
@@ -400,8 +411,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         timeout,
                         responseHeaders,
                         TaskCheckpointStatisticsHeaders.getInstance(),
-                        executionGraphCache,
                         executor,
+                        checkpointStatsSnapshotCache,
                         checkpointStatsCache);
 
         JobExceptionsHandler jobExceptionsHandler =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 8e95b5150df..d4f0ffc2b2e 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
@@ -65,6 +66,7 @@ import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderelection.LeaderElection;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
@@ -655,6 +657,32 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
         assertThat(completableFutureCompletableFuture.get(), 
is(failedExecutionGraphInfo));
     }
 
+    @Test
+    public void testRetrieveCheckpointStats() throws Exception {
+        CheckpointStatsSnapshot snapshot = CheckpointStatsSnapshot.empty();
+        TestingJobMasterGateway testingJobMasterGateway =
+                new TestingJobMasterGatewayBuilder()
+                        .setCheckpointStatsSnapshotSupplier(
+                                () -> 
CompletableFuture.completedFuture(snapshot))
+                        .build();
+
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        new TestingJobMasterGatewayJobManagerRunnerFactory(
+                                testingJobMasterGateway));
+        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        CompletableFuture<CheckpointStatsSnapshot> resultsFuture =
+                dispatcher.callAsyncInMainThread(
+                        () -> dispatcher.requestCheckpointStats(jobId, 
TIMEOUT));
+        
Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
+        Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot);
+    }
+
     @Test
     public void testThrowExceptionIfJobExecutionResultNotFound() throws 
Exception {
         dispatcher =
@@ -1752,6 +1780,45 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         }
     }
 
+    private static final class TestingJobMasterGatewayJobManagerRunnerFactory
+            extends TestingJobMasterServiceLeadershipRunnerFactory {
+        private final TestingJobMasterGateway testingJobMasterGateway;
+
+        private TestingJobMasterGatewayJobManagerRunnerFactory(
+                TestingJobMasterGateway testingJobMasterGateway) {
+            this.testingJobMasterGateway = testingJobMasterGateway;
+        }
+
+        @Override
+        public TestingJobManagerRunner createJobManagerRunner(
+                JobGraph jobGraph,
+                Configuration configuration,
+                RpcService rpcService,
+                HighAvailabilityServices highAvailabilityServices,
+                HeartbeatServices heartbeatServices,
+                JobManagerSharedServices jobManagerServices,
+                JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
+                FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
+                long initializationTimestamp)
+                throws Exception {
+            TestingJobManagerRunner runner =
+                    super.createJobManagerRunner(
+                            jobGraph,
+                            configuration,
+                            rpcService,
+                            highAvailabilityServices,
+                            heartbeatServices,
+                            jobManagerServices,
+                            jobManagerJobMetricGroupFactory,
+                            fatalErrorHandler,
+                            failureEnrichers,
+                            initializationTimestamp);
+            runner.completeJobMasterGatewayFuture(testingJobMasterGateway);
+            return runner;
+        }
+    }
+
     private static final class ExpectedJobIdJobManagerRunnerFactory
             implements JobManagerRunnerFactory {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 9d0b06a7277..00ef90ac807 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
@@ -2097,6 +2098,42 @@ class JobMasterTest {
         }
     }
 
+    @Test
+    void testRetrievingCheckpointStats() throws Exception {
+        // create savepoint data
+        final long savepointId = 42L;
+        final File savepointFile = createSavepoint(savepointId);
+
+        // set savepoint settings
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true);
+        final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+        final StandaloneCompletedCheckpointStore completedCheckpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
+                PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                        maxCheckpoints -> completedCheckpointStore);
+        
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withHighAvailabilityServices(haServices)
+                        .createJobMaster()) {
+
+            // we need to start and register the required slots to let the 
adaptive scheduler
+            // restore from the savepoint
+            jobMaster.start();
+
+            CheckpointStatsSnapshot checkpointStatsSnapshot =
+                    
jobMaster.getGateway().requestCheckpointStats(testingTimeout).get();
+
+            // assert that the checkpoint snapshot reflects the latest 
completed checkpoint
+            
assertThat(checkpointStatsSnapshot.getLatestRestoredCheckpoint().getCheckpointId())
+                    .isEqualTo(savepointId);
+        }
+    }
+
     private TestingResourceManagerGateway createResourceManagerGateway(
             CompletableFuture<Collection<BlockedNode>> 
firstReceivedBlockedNodeFuture,
             CompletableFuture<Collection<BlockedNode>> 
secondReceivedBlockedNodeFuture,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index dd049e05371..24c345f7355 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -118,7 +118,7 @@ public class TestingJobManagerRunner implements 
JobManagerRunner {
 
     @Override
     public boolean isInitialized() {
-        throw new UnsupportedOperationException();
+        return true;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index bf4d49a91ec..77674bcef64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.blocklist.BlockedNode;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -131,6 +132,10 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
 
     @Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>> 
requestJobSupplier;
 
+    @Nonnull
+    private final Supplier<CompletableFuture<CheckpointStatsSnapshot>>
+            checkpointStatsSnapshotSupplier;
+
     @Nonnull
     private final TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
             triggerSavepointFunction;
@@ -240,6 +245,9 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
             @Nonnull Function<ResourceID, CompletableFuture<Void>> 
resourceManagerHeartbeatFunction,
             @Nonnull Supplier<CompletableFuture<JobDetails>> 
requestJobDetailsSupplier,
             @Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>> 
requestJobSupplier,
+            @Nonnull
+                    Supplier<CompletableFuture<CheckpointStatsSnapshot>>
+                            checkpointStatsSnapshotSupplier,
             @Nonnull
                     TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
                             triggerSavepointFunction,
@@ -320,6 +328,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
         this.resourceManagerHeartbeatFunction = 
resourceManagerHeartbeatFunction;
         this.requestJobDetailsSupplier = requestJobDetailsSupplier;
         this.requestJobSupplier = requestJobSupplier;
+        this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier;
         this.triggerSavepointFunction = triggerSavepointFunction;
         this.triggerCheckpointFunction = triggerCheckpointFunction;
         this.stopWithSavepointFunction = stopWithSavepointFunction;
@@ -417,6 +426,11 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
         return requestJobSupplier.get();
     }
 
+    @Override
+    public CompletableFuture<CheckpointStatsSnapshot> 
requestCheckpointStats(Time timeout) {
+        return checkpointStatsSnapshotSupplier.get();
+    }
+
     @Override
     public CompletableFuture<String> triggerSavepoint(
             @Nullable final String targetDirectory,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index 6238736c9e5..acf1a034a82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.blocklist.BlockedNode;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -121,6 +122,8 @@ public class TestingJobMasterGatewayBuilder {
             () -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
     private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier 
=
             () -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+    private Supplier<CompletableFuture<CheckpointStatsSnapshot>> 
checkpointStatsSnapshotSupplier =
+            () -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
     private TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
             triggerSavepointFunction =
                     (targetDirectory, ignoredB, formatType) ->
@@ -289,6 +292,12 @@ public class TestingJobMasterGatewayBuilder {
         return this;
     }
 
+    public TestingJobMasterGatewayBuilder setCheckpointStatsSnapshotSupplier(
+            Supplier<CompletableFuture<CheckpointStatsSnapshot>> 
checkpointStatsSnapshotSupplier) {
+        this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier;
+        return this;
+    }
+
     public TestingJobMasterGatewayBuilder setTriggerSavepointFunction(
             TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
                     triggerSavepointFunction) {
@@ -439,6 +448,7 @@ public class TestingJobMasterGatewayBuilder {
                 resourceManagerHeartbeatFunction,
                 requestJobDetailsSupplier,
                 requestJobSupplier,
+                checkpointStatsSnapshotSupplier,
                 triggerSavepointFunction,
                 triggerCheckpointFunction,
                 stopWithSavepointFunction,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
index fb01fa6a66c..217130844b4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.TestLoggerExtension;
@@ -29,6 +30,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.time.Duration;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link RestHandlerConfiguration}. */
@@ -103,4 +106,39 @@ class RestHandlerConfigurationTest {
                 RestHandlerConfiguration.fromConfiguration(config);
         
assertThat(restHandlerConfiguration.isWebCancelEnabled()).isEqualTo(webCancelEnabled);
     }
+
+    @Test
+    public void testCheckpointCacheExpireAfterWrite() {
+        final Duration testDuration = Duration.ofMillis(100L);
+        final Configuration config = new Configuration();
+        config.set(RestOptions.CACHE_CHECKPOINT_STATISTICS_TIMEOUT, 
testDuration);
+
+        RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(config);
+        
assertThat(restHandlerConfiguration.getCheckpointCacheExpireAfterWrite())
+                .isEqualTo(testDuration);
+    }
+
+    @Test
+    public void testCheckpointCacheExpiryFallbackToRefreshInterval() {
+        final long refreshInterval = 1000L;
+        final Configuration config = new Configuration();
+        config.set(WebOptions.REFRESH_INTERVAL, refreshInterval);
+
+        RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(config);
+        
assertThat(restHandlerConfiguration.getCheckpointCacheExpireAfterWrite())
+                .isEqualTo(Duration.ofMillis(1000L));
+    }
+
+    @Test
+    public void testCheckpointCacheSize() {
+        final int testCacheSize = 50;
+        final Configuration config = new Configuration();
+        config.set(RestOptions.CACHE_CHECKPOINT_STATISTICS_SIZE, 
testCacheSize);
+
+        RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(config);
+        
assertThat(restHandlerConfiguration.getCheckpointCacheSize()).isEqualTo(testCacheSize);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
new file mode 100644
index 00000000000..f18cf985c5c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+
+/** Test class for {@link AbstractCheckpointStatsHandler}. */
+public class AbstractCheckpointStatsHandlerTest extends TestLogger {
+
+    private static final Time TIMEOUT = Time.seconds(10);
+
+    private static final JobID JOB_ID = new JobID();
+
+    private static final CheckpointStatsTracker checkpointStatsTracker =
+            new CheckpointStatsTracker(
+                    10, 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+
+    @Test
+    public void testRetrieveSnapshotFromCache() throws Exception {
+        GatewayRetriever<RestfulGateway> leaderRetriever =
+                () -> CompletableFuture.completedFuture(null);
+        CheckpointingStatistics checkpointingStatistics = 
getTestCheckpointingStatistics();
+        CheckpointStatsSnapshot checkpointStatsSnapshot1 = 
getTestCheckpointStatsSnapshot();
+
+        // Create a passthrough cache so the latest object will always be 
returned
+        Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> cache =
+                CacheBuilder.newBuilder().build();
+
+        try (RecordingCheckpointStatsHandler checkpointStatsHandler =
+                new RecordingCheckpointStatsHandler(
+                        leaderRetriever,
+                        TIMEOUT,
+                        Collections.emptyMap(),
+                        CheckpointingStatisticsHeaders.getInstance(),
+                        cache,
+                        Executors.directExecutor(),
+                        checkpointingStatistics)) {
+            RestfulGateway functioningRestfulGateway =
+                    new TestingRestfulGateway.Builder()
+                            .setRequestCheckpointStatsSnapshotFunction(
+                                    jobID ->
+                                            CompletableFuture.completedFuture(
+                                                    checkpointStatsSnapshot1))
+                            .build();
+            HandlerRequest<EmptyRequestBody> request =
+                    HandlerRequest.resolveParametersAndCreate(
+                            EmptyRequestBody.getInstance(),
+                            new JobMessageParameters(),
+                            Collections.singletonMap(JobIDPathParameter.KEY, 
JOB_ID.toString()),
+                            Collections.emptyMap(),
+                            Collections.emptyList());
+
+            assertThat(
+                            checkpointStatsHandler
+                                    .handleRequest(request, 
functioningRestfulGateway)
+                                    .get())
+                    .usingRecursiveComparison()
+                    .isEqualTo(checkpointingStatistics);
+            assertThat(checkpointStatsHandler.getStoredCheckpointStats())
+                    .isEqualTo(checkpointStatsSnapshot1);
+
+            // Refresh the checkpoint stats data
+            CheckpointStatsSnapshot checkpointStatsSnapshot2 = 
getTestCheckpointStatsSnapshot();
+            RestfulGateway refreshedRestfulGateway =
+                    new TestingRestfulGateway.Builder()
+                            .setRequestCheckpointStatsSnapshotFunction(
+                                    jobID ->
+                                            CompletableFuture.completedFuture(
+                                                    checkpointStatsSnapshot2))
+                            .build();
+            assertThat(checkpointStatsHandler.handleRequest(request, 
refreshedRestfulGateway).get())
+                    .usingRecursiveComparison()
+                    .isEqualTo(checkpointingStatistics);
+            assertThat(checkpointStatsHandler.getStoredCheckpointStats())
+                    .isEqualTo(checkpointStatsSnapshot2);
+        }
+    }
+
+    @Test
+    public void testRestExceptionPassedThrough() throws Exception {
+        GatewayRetriever<RestfulGateway> leaderRetriever =
+                () -> CompletableFuture.completedFuture(null);
+        CheckpointStatsSnapshot checkpointStatsSnapshot1 = 
getTestCheckpointStatsSnapshot();
+        RestHandlerException restHandlerException =
+                new RestHandlerException(
+                        "some exception thrown", 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+
+        try (ThrowingCheckpointStatsHandler checkpointStatsHandler =
+                new ThrowingCheckpointStatsHandler(
+                        leaderRetriever,
+                        TIMEOUT,
+                        Collections.emptyMap(),
+                        CheckpointingStatisticsHeaders.getInstance(),
+                        CacheBuilder.newBuilder().build(),
+                        Executors.directExecutor(),
+                        restHandlerException)) {
+            RestfulGateway restfulGateway =
+                    new TestingRestfulGateway.Builder()
+                            .setRequestCheckpointStatsSnapshotFunction(
+                                    jobID ->
+                                            CompletableFuture.completedFuture(
+                                                    checkpointStatsSnapshot1))
+                            .build();
+            HandlerRequest<EmptyRequestBody> request =
+                    HandlerRequest.resolveParametersAndCreate(
+                            EmptyRequestBody.getInstance(),
+                            new JobMessageParameters(),
+                            Collections.singletonMap(JobIDPathParameter.KEY, 
JOB_ID.toString()),
+                            Collections.emptyMap(),
+                            Collections.emptyList());
+
+            assertThatExceptionOfType(ExecutionException.class)
+                    .isThrownBy(
+                            () ->
+                                    checkpointStatsHandler
+                                            .handleRequest(request, 
restfulGateway)
+                                            .get())
+                    .withCause(restHandlerException);
+        }
+    }
+
+    @Test
+    public void testFlinkJobNotFoundException() throws Exception {
+        GatewayRetriever<RestfulGateway> leaderRetriever =
+                () -> CompletableFuture.completedFuture(null);
+        CheckpointStatsSnapshot checkpointStatsSnapshot1 = 
getTestCheckpointStatsSnapshot();
+        CompletableFuture<CheckpointStatsSnapshot> failedFuture = new 
CompletableFuture<>();
+        failedFuture.completeExceptionally(new 
FlinkJobNotFoundException(JOB_ID));
+
+        try (RecordingCheckpointStatsHandler checkpointStatsHandler =
+                new RecordingCheckpointStatsHandler(
+                        leaderRetriever,
+                        TIMEOUT,
+                        Collections.emptyMap(),
+                        CheckpointingStatisticsHeaders.getInstance(),
+                        CacheBuilder.newBuilder().build(),
+                        Executors.directExecutor(),
+                        null)) {
+            RestfulGateway restfulGateway =
+                    new TestingRestfulGateway.Builder()
+                            .setRequestCheckpointStatsSnapshotFunction(jobID 
-> failedFuture)
+                            .build();
+            HandlerRequest<EmptyRequestBody> request =
+                    HandlerRequest.resolveParametersAndCreate(
+                            EmptyRequestBody.getInstance(),
+                            new JobMessageParameters(),
+                            Collections.singletonMap(JobIDPathParameter.KEY, 
JOB_ID.toString()),
+                            Collections.emptyMap(),
+                            Collections.emptyList());
+
+            assertThat(checkpointStatsHandler.handleRequest(request, 
restfulGateway))
+                    .isCompletedExceptionally()
+                    .failsWithin(Duration.ofSeconds(1))
+                    .withThrowableOfType(ExecutionException.class)
+                    .withStackTraceContaining("Job %s not found", JOB_ID);
+        }
+    }
+
+    private static CheckpointStatsSnapshot getTestCheckpointStatsSnapshot() {
+        return checkpointStatsTracker.createSnapshot();
+    }
+
+    private CheckpointingStatistics getTestCheckpointingStatistics() {
+        final CheckpointingStatistics.Counts counts =
+                new CheckpointingStatistics.Counts(1, 2, 3, 4, 5);
+        final CheckpointingStatistics.Summary summary =
+                new CheckpointingStatistics.Summary(
+                        new StatsSummaryDto(1L, 1L, 1L, 0, 0, 0, 0, 0),
+                        new StatsSummaryDto(1L, 1L, 1L, 0, 0, 0, 0, 0),
+                        new StatsSummaryDto(2L, 2L, 2L, 0, 0, 0, 0, 0),
+                        new StatsSummaryDto(3L, 3L, 3L, 0, 0, 0, 0, 0),
+                        new StatsSummaryDto(4L, 4L, 4L, 0, 0, 0, 0, 0),
+                        new StatsSummaryDto(5L, 5L, 5L, 0, 0, 0, 0, 0));
+        return new CheckpointingStatistics(
+                counts,
+                summary,
+                new CheckpointingStatistics.LatestCheckpoints(null, null, 
null, null),
+                Collections.emptyList());
+    }
+
+    private static class RecordingCheckpointStatsHandler
+            extends AbstractCheckpointStatsHandler<CheckpointingStatistics, 
JobMessageParameters> {
+
+        private final CheckpointingStatistics returnValue;
+        private CheckpointStatsSnapshot storedCheckpointStats;
+
+        protected RecordingCheckpointStatsHandler(
+                GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+                Time timeout,
+                Map<String, String> responseHeaders,
+                MessageHeaders<EmptyRequestBody, CheckpointingStatistics, 
JobMessageParameters>
+                        messageHeaders,
+                Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+                        checkpointStatsSnapshotCache,
+                Executor executor,
+                CheckpointingStatistics returnValue) {
+            super(
+                    leaderRetriever,
+                    timeout,
+                    responseHeaders,
+                    messageHeaders,
+                    checkpointStatsSnapshotCache,
+                    executor);
+            this.returnValue = returnValue;
+        }
+
+        @Override
+        protected CheckpointingStatistics handleCheckpointStatsRequest(
+                HandlerRequest<EmptyRequestBody> request,
+                CheckpointStatsSnapshot checkpointStatsSnapshot)
+                throws RestHandlerException {
+            storedCheckpointStats = checkpointStatsSnapshot;
+            return returnValue;
+        }
+
+        public CheckpointStatsSnapshot getStoredCheckpointStats() {
+            return storedCheckpointStats;
+        }
+    }
+
+    private static class ThrowingCheckpointStatsHandler
+            extends AbstractCheckpointStatsHandler<CheckpointingStatistics, 
JobMessageParameters> {
+
+        private final RestHandlerException exception;
+
+        protected ThrowingCheckpointStatsHandler(
+                GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+                Time timeout,
+                Map<String, String> responseHeaders,
+                MessageHeaders<EmptyRequestBody, CheckpointingStatistics, 
JobMessageParameters>
+                        messageHeaders,
+                Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+                        checkpointStatsSnapshotCache,
+                Executor executor,
+                RestHandlerException exception) {
+            super(
+                    leaderRetriever,
+                    timeout,
+                    responseHeaders,
+                    messageHeaders,
+                    checkpointStatsSnapshotCache,
+                    executor);
+            this.exception = exception;
+        }
+
+        @Override
+        protected CheckpointingStatistics handleCheckpointStatsRequest(
+                HandlerRequest<EmptyRequestBody> request,
+                CheckpointStatsSnapshot checkpointStatsSnapshot)
+                throws RestHandlerException {
+            throw exception;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 9b774b7ab5a..801689a3aa7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -144,6 +145,12 @@ public class TestingSchedulerNG implements SchedulerNG {
         return null;
     }
 
+    @Override
+    public CheckpointStatsSnapshot requestCheckpointStats() {
+        failOperation();
+        return null;
+    }
+
     @Override
     public JobStatus requestJobStatus() {
         return JobStatus.CREATED;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index 7136620a907..08b988e2d0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -120,6 +121,8 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway
             Function<JobID, CompletableFuture<ArchivedExecutionGraph>> 
requestJobFunction,
             Function<JobID, CompletableFuture<ExecutionGraphInfo>>
                     requestExecutionGraphInfoFunction,
+            Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+                    requestCheckpointStatsSnapshotFunction,
             Function<JobID, CompletableFuture<JobResult>> 
requestJobResultFunction,
             Function<JobID, CompletableFuture<JobStatus>> 
requestJobStatusFunction,
             Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier,
@@ -173,6 +176,7 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway
                 cancelJobFunction,
                 requestJobFunction,
                 requestExecutionGraphInfoFunction,
+                requestCheckpointStatsSnapshotFunction,
                 requestJobResultFunction,
                 requestJobStatusFunction,
                 requestMultipleJobDetailsSupplier,
@@ -355,6 +359,7 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway
                     cancelJobFunction,
                     requestJobFunction,
                     requestExecutionGraphInfoFunction,
+                    requestCheckpointStatsSnapshotFunction,
                     requestJobResultFunction,
                     requestJobStatusFunction,
                     requestMultipleJobDetailsSupplier,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 997a1fe6865..51270543b39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -65,6 +66,10 @@ public class TestingRestfulGateway implements RestfulGateway 
{
             DEFAULT_REQUEST_EXECUTION_GRAPH_INFO =
                     jobId ->
                             FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+    static final Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+            DEFAULT_REQUEST_CHECKPOINT_STATS_SNAPSHOT =
+                    jobId ->
+                            FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
     static final Function<JobID, CompletableFuture<JobStatus>> 
DEFAULT_REQUEST_JOB_STATUS_FUNCTION =
             jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING);
     static final Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -148,6 +153,9 @@ public class TestingRestfulGateway implements 
RestfulGateway {
     protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
             requestExecutionGraphInfoFunction;
 
+    protected Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+            requestCheckpointStatsSnapshotFunction;
+
     protected Function<JobID, CompletableFuture<JobResult>> 
requestJobResultFunction;
 
     protected Function<JobID, CompletableFuture<JobStatus>> 
requestJobStatusFunction;
@@ -202,6 +210,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                 DEFAULT_CANCEL_JOB_FUNCTION,
                 DEFAULT_REQUEST_JOB_FUNCTION,
                 DEFAULT_REQUEST_EXECUTION_GRAPH_INFO,
+                DEFAULT_REQUEST_CHECKPOINT_STATS_SNAPSHOT,
                 DEFAULT_REQUEST_JOB_RESULT_FUNCTION,
                 DEFAULT_REQUEST_JOB_STATUS_FUNCTION,
                 DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER,
@@ -225,6 +234,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
             Function<JobID, CompletableFuture<ArchivedExecutionGraph>> 
requestJobFunction,
             Function<JobID, CompletableFuture<ExecutionGraphInfo>>
                     requestExecutionGraphInfoFunction,
+            Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+                    requestCheckpointStatsSnapshotFunction,
             Function<JobID, CompletableFuture<JobResult>> 
requestJobResultFunction,
             Function<JobID, CompletableFuture<JobStatus>> 
requestJobStatusFunction,
             Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier,
@@ -264,6 +275,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
         this.cancelJobFunction = cancelJobFunction;
         this.requestJobFunction = requestJobFunction;
         this.requestExecutionGraphInfoFunction = 
requestExecutionGraphInfoFunction;
+        this.requestCheckpointStatsSnapshotFunction = 
requestCheckpointStatsSnapshotFunction;
         this.requestJobResultFunction = requestJobResultFunction;
         this.requestJobStatusFunction = requestJobStatusFunction;
         this.requestMultipleJobDetailsSupplier = 
requestMultipleJobDetailsSupplier;
@@ -304,6 +316,12 @@ public class TestingRestfulGateway implements 
RestfulGateway {
         return requestExecutionGraphInfoFunction.apply(jobId);
     }
 
+    @Override
+    public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+            JobID jobId, Time timeout) {
+        return requestCheckpointStatsSnapshotFunction.apply(jobId);
+    }
+
     @Override
     public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time 
timeout) {
         return requestJobResultFunction.apply(jobId);
@@ -410,6 +428,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
         protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>> 
requestJobFunction;
         protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
                 requestExecutionGraphInfoFunction;
+        protected Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+                requestCheckpointStatsSnapshotFunction;
         protected Function<JobID, CompletableFuture<JobResult>> 
requestJobResultFunction;
         protected Function<JobID, CompletableFuture<JobStatus>> 
requestJobStatusFunction;
         protected Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -452,6 +472,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
             cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
             requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
             requestExecutionGraphInfoFunction = 
DEFAULT_REQUEST_EXECUTION_GRAPH_INFO;
+            requestCheckpointStatsSnapshotFunction = 
DEFAULT_REQUEST_CHECKPOINT_STATS_SNAPSHOT;
             requestJobResultFunction = DEFAULT_REQUEST_JOB_RESULT_FUNCTION;
             requestJobStatusFunction = DEFAULT_REQUEST_JOB_STATUS_FUNCTION;
             requestMultipleJobDetailsSupplier = 
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER;
@@ -499,6 +520,13 @@ public class TestingRestfulGateway implements 
RestfulGateway {
             return self();
         }
 
+        public T setRequestCheckpointStatsSnapshotFunction(
+                Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+                        requestCheckpointStatsSnapshotFunction) {
+            this.requestCheckpointStatsSnapshotFunction = 
requestCheckpointStatsSnapshotFunction;
+            return self();
+        }
+
         public T setRequestJobResultFunction(
                 Function<JobID, CompletableFuture<JobResult>> 
requestJobResultFunction) {
             this.requestJobResultFunction = requestJobResultFunction;
@@ -625,6 +653,7 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                     cancelJobFunction,
                     requestJobFunction,
                     requestExecutionGraphInfoFunction,
+                    requestCheckpointStatsSnapshotFunction,
                     requestJobResultFunction,
                     requestJobStatusFunction,
                     requestMultipleJobDetailsSupplier,


Reply via email to