This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7b9b4e53a59 [FLINK-32469][REST] Improve checkpoint REST APIs for
programmatic access
7b9b4e53a59 is described below
commit 7b9b4e53a59ab8f4f2a99a6e162a794d264f7daf
Author: Hong Liang Teoh <[email protected]>
AuthorDate: Thu Jun 29 08:32:57 2023 +0100
[FLINK-32469][REST] Improve checkpoint REST APIs for programmatic access
---
.../shortcodes/generated/expert_rest_section.html | 12 +
.../shortcodes/generated/rest_configuration.html | 12 +
.../apache/flink/configuration/RestOptions.java | 24 ++
.../flink/runtime/dispatcher/Dispatcher.java | 8 +
.../apache/flink/runtime/jobmaster/JobMaster.java | 6 +
.../flink/runtime/jobmaster/JobMasterGateway.java | 9 +
.../rest/handler/RestHandlerConfiguration.java | 40 ++-
.../job/checkpoints/AbstractCheckpointHandler.java | 24 +-
.../AbstractCheckpointStatsHandler.java | 119 ++++++++
.../CheckpointStatisticDetailsHandler.java | 9 +-
.../CheckpointingStatisticsHandler.java | 25 +-
.../TaskCheckpointStatisticDetailsHandler.java | 9 +-
.../flink/runtime/scheduler/SchedulerBase.java | 7 +
.../flink/runtime/scheduler/SchedulerNG.java | 10 +
.../scheduler/adaptive/AdaptiveScheduler.java | 6 +
.../NonLeaderRetrievalRestfulGateway.java | 7 +
.../flink/runtime/webmonitor/RestfulGateway.java | 11 +
.../runtime/webmonitor/WebMonitorEndpoint.java | 19 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 67 +++++
.../flink/runtime/jobmaster/JobMasterTest.java | 37 +++
.../runtime/jobmaster/TestingJobManagerRunner.java | 2 +-
.../jobmaster/utils/TestingJobMasterGateway.java | 14 +
.../utils/TestingJobMasterGatewayBuilder.java | 10 +
.../rest/handler/RestHandlerConfigurationTest.java | 38 +++
.../AbstractCheckpointStatsHandlerTest.java | 305 +++++++++++++++++++++
.../runtime/scheduler/TestingSchedulerNG.java | 7 +
.../webmonitor/TestingDispatcherGateway.java | 5 +
.../runtime/webmonitor/TestingRestfulGateway.java | 29 ++
28 files changed, 828 insertions(+), 43 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/expert_rest_section.html
b/docs/layouts/shortcodes/generated/expert_rest_section.html
index 733f7cd0a88..6c0d5e1a643 100644
--- a/docs/layouts/shortcodes/generated/expert_rest_section.html
+++ b/docs/layouts/shortcodes/generated/expert_rest_section.html
@@ -20,6 +20,18 @@
<td>Long</td>
<td>The time in ms that the client waits for the leader address,
e.g., Dispatcher or WebMonitorEndpoint</td>
</tr>
+ <tr>
+ <td><h5>rest.cache.checkpoint-statistics.size</h5></td>
+ <td style="word-wrap: break-word;">1000</td>
+ <td>Integer</td>
+ <td>Maximum number of entries in the checkpoint statistics
cache.</td>
+ </tr>
+ <tr>
+ <td><h5>rest.cache.checkpoint-statistics.timeout</h5></td>
+ <td style="word-wrap: break-word;">3 s</td>
+ <td>Duration</td>
+ <td>Duration from write after which cached checkpoints statistics
are cleaned up. For backwards compatibility, if no value is configured, <code
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
+ </tr>
<tr>
<td><h5>rest.client.max-content-length</h5></td>
<td style="word-wrap: break-word;">104857600</td>
diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html
b/docs/layouts/shortcodes/generated/rest_configuration.html
index f22f95c7488..467b4275acd 100644
--- a/docs/layouts/shortcodes/generated/rest_configuration.html
+++ b/docs/layouts/shortcodes/generated/rest_configuration.html
@@ -38,6 +38,18 @@
<td>String</td>
<td>The port that the server binds itself. Accepts a list of ports
(“50100,50101”), ranges (“50100-50200”) or a combination of both. It is
recommended to set a range of ports to avoid collisions when multiple Rest
servers are running on the same machine.</td>
</tr>
+ <tr>
+ <td><h5>rest.cache.checkpoint-statistics.size</h5></td>
+ <td style="word-wrap: break-word;">1000</td>
+ <td>Integer</td>
+ <td>Maximum number of entries in the checkpoint statistics
cache.</td>
+ </tr>
+ <tr>
+ <td><h5>rest.cache.checkpoint-statistics.timeout</h5></td>
+ <td style="word-wrap: break-word;">3 s</td>
+ <td>Duration</td>
+ <td>Duration from write after which cached checkpoints statistics
are cleaned up. For backwards compatibility, if no value is configured, <code
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
+ </tr>
<tr>
<td><h5>rest.client.max-content-length</h5></td>
<td style="word-wrap: break-word;">104857600</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 165be2f3415..d3c1a6baa90 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.description.Description;
import java.time.Duration;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
/** Configuration parameters for REST communication. */
@@ -193,6 +194,29 @@ public class RestOptions {
+ "Lowering the thread priority will give
Flink's main components more CPU time whereas "
+ "increasing will allocate more time for
the REST server's processing.");
+ /** Duration from write, after which cached checkpoints statistics are
cleaned up. */
+ @Documentation.Section(Documentation.Sections.EXPERT_REST)
+ public static final ConfigOption<Duration>
CACHE_CHECKPOINT_STATISTICS_TIMEOUT =
+ key("rest.cache.checkpoint-statistics.timeout")
+ .durationType()
+
.defaultValue(Duration.ofMillis(WebOptions.REFRESH_INTERVAL.defaultValue()))
+ .withFallbackKeys(WebOptions.REFRESH_INTERVAL.key())
+ .withDescription(
+ Description.builder()
+ .text(
+ "Duration from write after which
cached checkpoints statistics are cleaned up. For backwards compatibility, if
no value is configured, %s will be used instead.",
+
code(WebOptions.REFRESH_INTERVAL.key()))
+ .build());
+
+ /** Maximum number of entries in the checkpoint statistics cache. */
+ @Documentation.Section(Documentation.Sections.EXPERT_REST)
+ public static final ConfigOption<Integer> CACHE_CHECKPOINT_STATISTICS_SIZE
=
+ key("rest.cache.checkpoint-statistics.size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "Maximum number of entries in the checkpoint
statistics cache.");
+
/** Enables the experimental flame graph feature. */
@Documentation.Section(Documentation.Sections.EXPERT_REST)
public static final ConfigOption<Boolean> ENABLE_FLAMEGRAPH =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index d7d23256cbc..d2932562423 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
@@ -937,6 +938,13 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
.exceptionally(checkExecutionGraphStoreOnException);
}
+ @Override
+ public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+ JobID jobId, Time timeout) {
+ return performOperationOnJobMasterGateway(
+ jobId, gateway -> gateway.requestCheckpointStats(timeout));
+ }
+
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time
timeout) {
if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6980824517a..b80cab39abf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.blocklist.BlocklistContext;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.blocklist.BlocklistUtils;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -868,6 +869,11 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
return CompletableFuture.completedFuture(schedulerNG.requestJob());
}
+ @Override
+ public CompletableFuture<CheckpointStatsSnapshot>
requestCheckpointStats(Time timeout) {
+ return
CompletableFuture.completedFuture(schedulerNG.requestCheckpointStats());
+ }
+
@Override
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
final CheckpointType checkpointType, final Time timeout) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e7becbe5189..6c1b79568a8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.blocklist.BlocklistListener;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -206,6 +207,14 @@ public interface JobMasterGateway
*/
CompletableFuture<ExecutionGraphInfo> requestJob(@RpcTimeout Time timeout);
+ /**
+ * Requests the {@link CheckpointStatsSnapshot} of the job.
+ *
+ * @param timeout for the rpc call
+ * @return Future which is completed with the {@link
CheckpointStatsSnapshot} of the job
+ */
+ CompletableFuture<CheckpointStatsSnapshot>
requestCheckpointStats(@RpcTimeout Time timeout);
+
/**
* Triggers taking a savepoint of the executed job.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 9c8de50b21c..c60c6bfe7fd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -21,17 +21,23 @@ package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.util.Preconditions;
import java.io.File;
+import java.time.Duration;
/** Configuration object containing values for the rest handler configuration.
*/
public class RestHandlerConfiguration {
private final long refreshInterval;
- private final int maxCheckpointStatisticCacheEntries;
+ private final int checkpointHistorySize;
+
+ private final Duration checkpointCacheExpireAfterWrite;
+
+ private final int checkpointCacheSize;
private final Time timeout;
@@ -45,7 +51,9 @@ public class RestHandlerConfiguration {
public RestHandlerConfiguration(
long refreshInterval,
- int maxCheckpointStatisticCacheEntries,
+ int checkpointHistorySize,
+ Duration checkpointCacheExpireAfterWrite,
+ int checkpointCacheSize,
Time timeout,
File webUiDir,
boolean webSubmitEnabled,
@@ -55,7 +63,9 @@ public class RestHandlerConfiguration {
refreshInterval > 0L, "The refresh interval (ms) should be
larger than 0.");
this.refreshInterval = refreshInterval;
- this.maxCheckpointStatisticCacheEntries =
maxCheckpointStatisticCacheEntries;
+ this.checkpointHistorySize = checkpointHistorySize;
+ this.checkpointCacheExpireAfterWrite = checkpointCacheExpireAfterWrite;
+ this.checkpointCacheSize = checkpointCacheSize;
this.timeout = Preconditions.checkNotNull(timeout);
this.webUiDir = Preconditions.checkNotNull(webUiDir);
@@ -68,8 +78,16 @@ public class RestHandlerConfiguration {
return refreshInterval;
}
- public int getMaxCheckpointStatisticCacheEntries() {
- return maxCheckpointStatisticCacheEntries;
+ public int getCheckpointHistorySize() {
+ return checkpointHistorySize;
+ }
+
+ public Duration getCheckpointCacheExpireAfterWrite() {
+ return checkpointCacheExpireAfterWrite;
+ }
+
+ public int getCheckpointCacheSize() {
+ return checkpointCacheSize;
}
public Time getTimeout() {
@@ -95,8 +113,14 @@ public class RestHandlerConfiguration {
public static RestHandlerConfiguration fromConfiguration(Configuration
configuration) {
final long refreshInterval =
configuration.getLong(WebOptions.REFRESH_INTERVAL);
- final int maxCheckpointStatisticCacheEntries =
+ final int checkpointHistorySize =
configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
+ final Duration checkpointStatsSnapshotCacheExpireAfterWrite =
+ configuration
+
.getOptional(RestOptions.CACHE_CHECKPOINT_STATISTICS_TIMEOUT)
+ .orElse(Duration.ofMillis(refreshInterval));
+ final int checkpointStatsSnapshotCacheSize =
+
configuration.get(RestOptions.CACHE_CHECKPOINT_STATISTICS_SIZE);
final Time timeout =
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
@@ -113,7 +137,9 @@ public class RestHandlerConfiguration {
return new RestHandlerConfiguration(
refreshInterval,
- maxCheckpointStatisticCacheEntries,
+ checkpointHistorySize,
+ checkpointStatsSnapshotCacheExpireAfterWrite,
+ checkpointStatsSnapshotCacheSize,
timeout,
webUiDir,
webSubmitEnabled,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
index 61a4480d64b..b8f86a9f885 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -18,15 +18,14 @@
package org.apache.flink.runtime.rest.handler.job.checkpoints;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import
org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
@@ -35,9 +34,11 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
@@ -47,7 +48,7 @@ import java.util.concurrent.Executor;
*/
public abstract class AbstractCheckpointHandler<
R extends ResponseBody, M extends CheckpointMessageParameters>
- extends AbstractAccessExecutionGraphHandler<R, M> {
+ extends AbstractCheckpointStatsHandler<R, M> {
private final CheckpointStatsCache checkpointStatsCache;
@@ -56,29 +57,28 @@ public abstract class AbstractCheckpointHandler<
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
- ExecutionGraphCache executionGraphCache,
Executor executor,
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
checkpointStatsSnapshotCache,
CheckpointStatsCache checkpointStatsCache) {
super(
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
- executionGraphCache,
+ checkpointStatsSnapshotCache,
executor);
this.checkpointStatsCache =
Preconditions.checkNotNull(checkpointStatsCache);
}
@Override
- protected R handleRequest(
- HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph
executionGraph)
+ protected R handleCheckpointStatsRequest(
+ HandlerRequest<EmptyRequestBody> request,
+ CheckpointStatsSnapshot checkpointStatsSnapshot)
throws RestHandlerException {
+ JobID jobId = request.getPathParameter(JobIDPathParameter.class);
final long checkpointId =
request.getPathParameter(CheckpointIdPathParameter.class);
- final CheckpointStatsSnapshot checkpointStatsSnapshot =
- executionGraph.getCheckpointStatsSnapshot();
-
if (checkpointStatsSnapshot != null) {
AbstractCheckpointStats checkpointStats =
checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId);
@@ -100,7 +100,7 @@ public abstract class AbstractCheckpointHandler<
}
} else {
throw new RestHandlerException(
- "Checkpointing was not enabled for job " +
executionGraph.getJobID() + '.',
+ "Checkpointing was not enabled for job " + jobId + '.',
HttpResponseStatus.NOT_FOUND);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandler.java
new file mode 100644
index 00000000000..60d009133f5
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Abstract class for checkpoint handlers that will cache the {@link
CheckpointStatsSnapshot}
+ * object.
+ *
+ * @param <R> the response type
+ * @param <M> the message parameters
+ */
+@Internal
+public abstract class AbstractCheckpointStatsHandler<
+ R extends ResponseBody, M extends JobMessageParameters>
+ extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, M> {
+
+ private final Executor executor;
+ private final Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ checkpointStatsSnapshotCache;
+
+ protected AbstractCheckpointStatsHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
checkpointStatsSnapshotCache,
+ Executor executor) {
+ super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+ this.executor = executor;
+ this.checkpointStatsSnapshotCache = checkpointStatsSnapshotCache;
+ }
+
+ @Override
+ protected CompletableFuture<R> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull
RestfulGateway gateway)
+ throws RestHandlerException {
+ JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+
+ try {
+ return checkpointStatsSnapshotCache
+ .get(jobId, () -> gateway.requestCheckpointStats(jobId,
timeout))
+ .thenApplyAsync(
+ checkpointStatsSnapshot -> {
+ try {
+ return handleCheckpointStatsRequest(
+ request, checkpointStatsSnapshot);
+ } catch (RestHandlerException e) {
+ throw new CompletionException(e);
+ }
+ },
+ executor)
+ .exceptionally(
+ throwable -> {
+ throwable =
ExceptionUtils.stripCompletionException(throwable);
+ if (throwable instanceof
FlinkJobNotFoundException) {
+ throw new CompletionException(
+ new NotFoundException(
+ String.format("Job %s not
found", jobId),
+ throwable));
+ } else {
+ throw new CompletionException(throwable);
+ }
+ });
+ } catch (ExecutionException e) {
+ CompletableFuture<R> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
+
+ protected abstract R handleCheckpointStatsRequest(
+ HandlerRequest<EmptyRequestBody> request,
+ CheckpointStatsSnapshot checkpointStatsSnapshot)
+ throws RestHandlerException;
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
index cea6da26eb5..ac41bc71fe3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.rest.handler.job.checkpoints;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -37,12 +37,15 @@ import
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/** REST handler which returns the details for a checkpoint. */
@@ -56,16 +59,16 @@ public class CheckpointStatisticDetailsHandler
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, CheckpointStatistics,
CheckpointMessageParameters>
messageHeaders,
- ExecutionGraphCache executionGraphCache,
Executor executor,
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
checkpointStatsSnapshotCache,
CheckpointStatsCache checkpointStatsCache) {
super(
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
- executionGraphCache,
executor,
+ checkpointStatsSnapshotCache,
checkpointStatsCache);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index 7bc4c9214c1..6accdd40983 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.handler.job.checkpoints;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
@@ -28,8 +29,6 @@ import
org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import
org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
@@ -44,6 +43,7 @@ import
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
@@ -52,11 +52,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/** Handler which serves the checkpoint statistics. */
public class CheckpointingStatisticsHandler
- extends AbstractAccessExecutionGraphHandler<CheckpointingStatistics,
JobMessageParameters>
+ extends AbstractCheckpointStatsHandler<CheckpointingStatistics,
JobMessageParameters>
implements OnlyExecutionGraphJsonArchivist {
public CheckpointingStatisticsHandler(
@@ -65,22 +66,23 @@ public class CheckpointingStatisticsHandler
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, CheckpointingStatistics,
JobMessageParameters>
messageHeaders,
- ExecutionGraphCache executionGraphCache,
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
checkpointStatsSnapshotCache,
Executor executor) {
super(
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
- executionGraphCache,
+ checkpointStatsSnapshotCache,
executor);
}
@Override
- protected CheckpointingStatistics handleRequest(
- HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph
executionGraph)
+ protected CheckpointingStatistics handleCheckpointStatsRequest(
+ HandlerRequest<EmptyRequestBody> request,
+ CheckpointStatsSnapshot checkpointStatsSnapshot)
throws RestHandlerException {
- return createCheckpointingStatistics(executionGraph);
+ return createCheckpointingStatistics(checkpointStatsSnapshot);
}
@Override
@@ -88,7 +90,7 @@ public class CheckpointingStatisticsHandler
throws IOException {
ResponseBody json;
try {
- json = createCheckpointingStatistics(graph);
+ json =
createCheckpointingStatistics(graph.getCheckpointStatsSnapshot());
} catch (RestHandlerException rhe) {
json = new ErrorResponseBody(rhe.getMessage());
}
@@ -100,10 +102,7 @@ public class CheckpointingStatisticsHandler
}
private static CheckpointingStatistics createCheckpointingStatistics(
- AccessExecutionGraph executionGraph) throws RestHandlerException {
- final CheckpointStatsSnapshot checkpointStatsSnapshot =
- executionGraph.getCheckpointStatsSnapshot();
-
+ CheckpointStatsSnapshot checkpointStatsSnapshot) throws
RestHandlerException {
if (checkpointStatsSnapshot == null) {
throw new RestHandlerException(
"Checkpointing has not been enabled.",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
index 3f5e0f16c9f..1f0b7e5ed1e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.handler.job.checkpoints;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -30,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -46,12 +46,15 @@ import
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import
org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/** REST handler which serves checkpoint statistics for subtasks. */
@@ -69,16 +72,16 @@ public class TaskCheckpointStatisticDetailsHandler
TaskCheckpointStatisticsWithSubtaskDetails,
TaskCheckpointMessageParameters>
messageHeaders,
- ExecutionGraphCache executionGraphCache,
Executor executor,
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
checkpointStatsSnapshotCache,
CheckpointStatsCache checkpointStatsCache) {
super(
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
- executionGraphCache,
executor,
+ checkpointStatsSnapshotCache,
checkpointStatsCache);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index bacb0fa7718..b64bcd9e44b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -39,6 +39,7 @@ import
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -799,6 +800,12 @@ public abstract class SchedulerBase implements
SchedulerNG, CheckpointScheduling
ArchivedExecutionGraph.createFrom(executionGraph),
getExceptionHistory());
}
+ @Override
+ public CheckpointStatsSnapshot requestCheckpointStats() {
+ mainThreadExecutor.assertRunningInMainThread();
+ return executionGraph.getCheckpointStatsSnapshot();
+ }
+
@Override
public JobStatus requestJobStatus() {
return executionGraph.getState();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 0da92a805cf..7887dc8abba 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -93,6 +94,15 @@ public interface SchedulerNG extends GlobalFailureHandler,
AutoCloseableAsync {
ExecutionGraphInfo requestJob();
+ /**
+ * Returns the checkpoint statistics for a given job. Although the {@link
+ * CheckpointStatsSnapshot} is included in the {@link ExecutionGraphInfo},
this method is
+ * preferred to {@link SchedulerNG#requestJob()} because it is less
expensive.
+ *
+ * @return checkpoint statistics snapshot for job graph
+ */
+ CheckpointStatsSnapshot requestCheckpointStats();
+
JobStatus requestJobStatus();
JobDetails requestJobDetails();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index ddd6fd5ea95..d4b9d65d403 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -39,6 +39,7 @@ import
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -593,6 +594,11 @@ public class AdaptiveScheduler
return new ExecutionGraphInfo(state.getJob(),
exceptionHistory.toArrayList());
}
+ @Override
+ public CheckpointStatsSnapshot requestCheckpointStats() {
+ return state.getJob().getCheckpointStatsSnapshot();
+ }
+
@Override
public void archiveFailure(RootExceptionHistoryEntry failure) {
exceptionHistory.add(failure);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
index d0be629e0e4..28cc793c1c5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -69,6 +70,12 @@ public class NonLeaderRetrievalRestfulGateway implements
RestfulGateway {
throw new UnsupportedOperationException(MESSAGE);
}
+ @Override
+ public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+ JobID jobId, Time timeout) {
+ throw new UnsupportedOperationException(MESSAGE);
+ }
+
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time
timeout) {
throw new UnsupportedOperationException(MESSAGE);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 2d3b5363ca9..7aa246bd705 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
@@ -95,6 +96,16 @@ public interface RestfulGateway extends RpcGateway {
CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
JobID jobId, @RpcTimeout Time timeout);
+ /**
+ * Requests the {@link CheckpointStatsSnapshot} containing checkpointing
information.
+ *
+ * @param jobId identifying the job whose {@link CheckpointStatsSnapshot}
is requested
+ * @param timeout for the asynchronous operation
+ * @return Future containing the {@link CheckpointStatsSnapshot} for the
given jobId
+ */
+ CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+ JobID jobId, @RpcTimeout Time timeout);
+
/**
* Requests the {@link JobResult} of a job specified by the given jobId.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index c653866b28b..7153f320ef9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AkkaOptions;
@@ -25,6 +26,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -163,6 +165,8 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import javax.annotation.Nullable;
@@ -199,6 +203,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
private final ExecutionGraphCache executionGraphCache;
private final CheckpointStatsCache checkpointStatsCache;
+ private final Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ checkpointStatsSnapshotCache;
private final MetricFetcher metricFetcher;
@@ -235,7 +241,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
this.executionGraphCache = executionGraphCache;
this.checkpointStatsCache =
- new
CheckpointStatsCache(restConfiguration.getMaxCheckpointStatisticCacheEntries());
+ new
CheckpointStatsCache(restConfiguration.getCheckpointHistorySize());
+ this.checkpointStatsSnapshotCache =
+ CacheBuilder.newBuilder()
+
.maximumSize(restConfiguration.getCheckpointCacheSize())
+
.expireAfterWrite(restConfiguration.getCheckpointCacheExpireAfterWrite())
+ .build();
this.metricFetcher = metricFetcher;
@@ -372,7 +383,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
timeout,
responseHeaders,
CheckpointingStatisticsHeaders.getInstance(),
- executionGraphCache,
+ checkpointStatsSnapshotCache,
executor);
CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler =
@@ -381,8 +392,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
timeout,
responseHeaders,
CheckpointStatisticDetailsHeaders.getInstance(),
- executionGraphCache,
executor,
+ checkpointStatsSnapshotCache,
checkpointStatsCache);
JobPlanHandler jobPlanHandler =
@@ -400,8 +411,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
timeout,
responseHeaders,
TaskCheckpointStatisticsHeaders.getInstance(),
- executionGraphCache,
executor,
+ checkpointStatsSnapshotCache,
checkpointStatsCache);
JobExceptionsHandler jobExceptionsHandler =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 8e95b5150df..d4f0ffc2b2e 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
@@ -65,6 +66,7 @@ import
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import
org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import
org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
@@ -655,6 +657,32 @@ public class DispatcherTest extends AbstractDispatcherTest
{
assertThat(completableFutureCompletableFuture.get(),
is(failedExecutionGraphInfo));
}
+ @Test
+ public void testRetrieveCheckpointStats() throws Exception {
+ CheckpointStatsSnapshot snapshot = CheckpointStatsSnapshot.empty();
+ TestingJobMasterGateway testingJobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setCheckpointStatsSnapshotSupplier(
+ () ->
CompletableFuture.completedFuture(snapshot))
+ .build();
+
+ dispatcher =
+ createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ new TestingJobMasterGatewayJobManagerRunnerFactory(
+ testingJobMasterGateway));
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ CompletableFuture<CheckpointStatsSnapshot> resultsFuture =
+ dispatcher.callAsyncInMainThread(
+ () -> dispatcher.requestCheckpointStats(jobId,
TIMEOUT));
+
Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1));
+ Assertions.assertThat(resultsFuture).isCompletedWithValue(snapshot);
+ }
+
@Test
public void testThrowExceptionIfJobExecutionResultNotFound() throws
Exception {
dispatcher =
@@ -1752,6 +1780,45 @@ public class DispatcherTest extends
AbstractDispatcherTest {
}
}
+ private static final class TestingJobMasterGatewayJobManagerRunnerFactory
+ extends TestingJobMasterServiceLeadershipRunnerFactory {
+ private final TestingJobMasterGateway testingJobMasterGateway;
+
+ private TestingJobMasterGatewayJobManagerRunnerFactory(
+ TestingJobMasterGateway testingJobMasterGateway) {
+ this.testingJobMasterGateway = testingJobMasterGateway;
+ }
+
+ @Override
+ public TestingJobManagerRunner createJobManagerRunner(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerSharedServices jobManagerServices,
+ JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
+ long initializationTimestamp)
+ throws Exception {
+ TestingJobManagerRunner runner =
+ super.createJobManagerRunner(
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ jobManagerServices,
+ jobManagerJobMetricGroupFactory,
+ fatalErrorHandler,
+ failureEnrichers,
+ initializationTimestamp);
+ runner.completeJobMasterGatewayFuture(testingJobMasterGateway);
+ return runner;
+ }
+ }
+
private static final class ExpectedJobIdJobManagerRunnerFactory
implements JobManagerRunnerFactory {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 9d0b06a7277..00ef90ac807 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -42,6 +42,7 @@ import
org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
@@ -2097,6 +2098,42 @@ class JobMasterTest {
}
}
+ @Test
+ void testRetrievingCheckpointStats() throws Exception {
+ // create savepoint data
+ final long savepointId = 42L;
+ final File savepointFile = createSavepoint(savepointId);
+
+ // set savepoint settings
+ final SavepointRestoreSettings savepointRestoreSettings =
+
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true);
+ final JobGraph jobGraph =
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+ final StandaloneCompletedCheckpointStore completedCheckpointStore =
+ new StandaloneCompletedCheckpointStore(1);
+ final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
+ PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+ maxCheckpoints -> completedCheckpointStore);
+
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+
+ try (final JobMaster jobMaster =
+ new JobMasterBuilder(jobGraph, rpcService)
+ .withHighAvailabilityServices(haServices)
+ .createJobMaster()) {
+
+ // we need to start and register the required slots to let the
adaptive scheduler
+ // restore from the savepoint
+ jobMaster.start();
+
+ CheckpointStatsSnapshot checkpointStatsSnapshot =
+
jobMaster.getGateway().requestCheckpointStats(testingTimeout).get();
+
+ // assert that the checkpoint snapshot reflects the latest
completed checkpoint
+
assertThat(checkpointStatsSnapshot.getLatestRestoredCheckpoint().getCheckpointId())
+ .isEqualTo(savepointId);
+ }
+ }
+
private TestingResourceManagerGateway createResourceManagerGateway(
CompletableFuture<Collection<BlockedNode>>
firstReceivedBlockedNodeFuture,
CompletableFuture<Collection<BlockedNode>>
secondReceivedBlockedNodeFuture,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index dd049e05371..24c345f7355 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -118,7 +118,7 @@ public class TestingJobManagerRunner implements
JobManagerRunner {
@Override
public boolean isInitialized() {
- throw new UnsupportedOperationException();
+ return true;
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index bf4d49a91ec..77674bcef64 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -131,6 +132,10 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
@Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>>
requestJobSupplier;
+ @Nonnull
+ private final Supplier<CompletableFuture<CheckpointStatsSnapshot>>
+ checkpointStatsSnapshotSupplier;
+
@Nonnull
private final TriFunction<String, Boolean, SavepointFormatType,
CompletableFuture<String>>
triggerSavepointFunction;
@@ -240,6 +245,9 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
@Nonnull Function<ResourceID, CompletableFuture<Void>>
resourceManagerHeartbeatFunction,
@Nonnull Supplier<CompletableFuture<JobDetails>>
requestJobDetailsSupplier,
@Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>>
requestJobSupplier,
+ @Nonnull
+ Supplier<CompletableFuture<CheckpointStatsSnapshot>>
+ checkpointStatsSnapshotSupplier,
@Nonnull
TriFunction<String, Boolean, SavepointFormatType,
CompletableFuture<String>>
triggerSavepointFunction,
@@ -320,6 +328,7 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
this.resourceManagerHeartbeatFunction =
resourceManagerHeartbeatFunction;
this.requestJobDetailsSupplier = requestJobDetailsSupplier;
this.requestJobSupplier = requestJobSupplier;
+ this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier;
this.triggerSavepointFunction = triggerSavepointFunction;
this.triggerCheckpointFunction = triggerCheckpointFunction;
this.stopWithSavepointFunction = stopWithSavepointFunction;
@@ -417,6 +426,11 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
return requestJobSupplier.get();
}
+ @Override
+ public CompletableFuture<CheckpointStatsSnapshot>
requestCheckpointStats(Time timeout) {
+ return checkpointStatsSnapshotSupplier.get();
+ }
+
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index 6238736c9e5..acf1a034a82 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -121,6 +122,8 @@ public class TestingJobMasterGatewayBuilder {
() -> FutureUtils.completedExceptionally(new
UnsupportedOperationException());
private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier
=
() -> FutureUtils.completedExceptionally(new
UnsupportedOperationException());
+ private Supplier<CompletableFuture<CheckpointStatsSnapshot>>
checkpointStatsSnapshotSupplier =
+ () -> FutureUtils.completedExceptionally(new
UnsupportedOperationException());
private TriFunction<String, Boolean, SavepointFormatType,
CompletableFuture<String>>
triggerSavepointFunction =
(targetDirectory, ignoredB, formatType) ->
@@ -289,6 +292,12 @@ public class TestingJobMasterGatewayBuilder {
return this;
}
+ public TestingJobMasterGatewayBuilder setCheckpointStatsSnapshotSupplier(
+ Supplier<CompletableFuture<CheckpointStatsSnapshot>>
checkpointStatsSnapshotSupplier) {
+ this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier;
+ return this;
+ }
+
public TestingJobMasterGatewayBuilder setTriggerSavepointFunction(
TriFunction<String, Boolean, SavepointFormatType,
CompletableFuture<String>>
triggerSavepointFunction) {
@@ -439,6 +448,7 @@ public class TestingJobMasterGatewayBuilder {
resourceManagerHeartbeatFunction,
requestJobDetailsSupplier,
requestJobSupplier,
+ checkpointStatsSnapshotSupplier,
triggerSavepointFunction,
triggerCheckpointFunction,
stopWithSavepointFunction,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
index fb01fa6a66c..217130844b4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.util.TestLoggerExtension;
@@ -29,6 +30,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import java.time.Duration;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link RestHandlerConfiguration}. */
@@ -103,4 +106,39 @@ class RestHandlerConfigurationTest {
RestHandlerConfiguration.fromConfiguration(config);
assertThat(restHandlerConfiguration.isWebCancelEnabled()).isEqualTo(webCancelEnabled);
}
+
+ @Test
+ public void testCheckpointCacheExpireAfterWrite() {
+ final Duration testDuration = Duration.ofMillis(100L);
+ final Configuration config = new Configuration();
+ config.set(RestOptions.CACHE_CHECKPOINT_STATISTICS_TIMEOUT,
testDuration);
+
+ RestHandlerConfiguration restHandlerConfiguration =
+ RestHandlerConfiguration.fromConfiguration(config);
+
assertThat(restHandlerConfiguration.getCheckpointCacheExpireAfterWrite())
+ .isEqualTo(testDuration);
+ }
+
+ @Test
+ public void testCheckpointCacheExpiryFallbackToRefreshInterval() {
+ final long refreshInterval = 1000L;
+ final Configuration config = new Configuration();
+ config.set(WebOptions.REFRESH_INTERVAL, refreshInterval);
+
+ RestHandlerConfiguration restHandlerConfiguration =
+ RestHandlerConfiguration.fromConfiguration(config);
+
assertThat(restHandlerConfiguration.getCheckpointCacheExpireAfterWrite())
+ .isEqualTo(Duration.ofMillis(1000L));
+ }
+
+ @Test
+ public void testCheckpointCacheSize() {
+ final int testCacheSize = 50;
+ final Configuration config = new Configuration();
+ config.set(RestOptions.CACHE_CHECKPOINT_STATISTICS_SIZE,
testCacheSize);
+
+ RestHandlerConfiguration restHandlerConfiguration =
+ RestHandlerConfiguration.fromConfiguration(config);
+
assertThat(restHandlerConfiguration.getCheckpointCacheSize()).isEqualTo(testCacheSize);
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
new file mode 100644
index 00000000000..f18cf985c5c
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+
+/** Test class for {@link AbstractCheckpointStatsHandler}. */
+public class AbstractCheckpointStatsHandlerTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.seconds(10);
+
+ private static final JobID JOB_ID = new JobID();
+
+ private static final CheckpointStatsTracker checkpointStatsTracker =
+ new CheckpointStatsTracker(
+ 10,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+
+ @Test
+ public void testRetrieveSnapshotFromCache() throws Exception {
+ GatewayRetriever<RestfulGateway> leaderRetriever =
+ () -> CompletableFuture.completedFuture(null);
+ CheckpointingStatistics checkpointingStatistics =
getTestCheckpointingStatistics();
+ CheckpointStatsSnapshot checkpointStatsSnapshot1 =
getTestCheckpointStatsSnapshot();
+
+ // Create a passthrough cache so the latest object will always be
returned
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> cache =
+ CacheBuilder.newBuilder().build();
+
+ try (RecordingCheckpointStatsHandler checkpointStatsHandler =
+ new RecordingCheckpointStatsHandler(
+ leaderRetriever,
+ TIMEOUT,
+ Collections.emptyMap(),
+ CheckpointingStatisticsHeaders.getInstance(),
+ cache,
+ Executors.directExecutor(),
+ checkpointingStatistics)) {
+ RestfulGateway functioningRestfulGateway =
+ new TestingRestfulGateway.Builder()
+ .setRequestCheckpointStatsSnapshotFunction(
+ jobID ->
+ CompletableFuture.completedFuture(
+ checkpointStatsSnapshot1))
+ .build();
+ HandlerRequest<EmptyRequestBody> request =
+ HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobMessageParameters(),
+ Collections.singletonMap(JobIDPathParameter.KEY,
JOB_ID.toString()),
+ Collections.emptyMap(),
+ Collections.emptyList());
+
+ assertThat(
+ checkpointStatsHandler
+ .handleRequest(request,
functioningRestfulGateway)
+ .get())
+ .usingRecursiveComparison()
+ .isEqualTo(checkpointingStatistics);
+ assertThat(checkpointStatsHandler.getStoredCheckpointStats())
+ .isEqualTo(checkpointStatsSnapshot1);
+
+ // Refresh the checkpoint stats data
+ CheckpointStatsSnapshot checkpointStatsSnapshot2 =
getTestCheckpointStatsSnapshot();
+ RestfulGateway refreshedRestfulGateway =
+ new TestingRestfulGateway.Builder()
+ .setRequestCheckpointStatsSnapshotFunction(
+ jobID ->
+ CompletableFuture.completedFuture(
+ checkpointStatsSnapshot2))
+ .build();
+ assertThat(checkpointStatsHandler.handleRequest(request,
refreshedRestfulGateway).get())
+ .usingRecursiveComparison()
+ .isEqualTo(checkpointingStatistics);
+ assertThat(checkpointStatsHandler.getStoredCheckpointStats())
+ .isEqualTo(checkpointStatsSnapshot2);
+ }
+ }
+
+ @Test
+ public void testRestExceptionPassedThrough() throws Exception {
+ GatewayRetriever<RestfulGateway> leaderRetriever =
+ () -> CompletableFuture.completedFuture(null);
+ CheckpointStatsSnapshot checkpointStatsSnapshot1 =
getTestCheckpointStatsSnapshot();
+ RestHandlerException restHandlerException =
+ new RestHandlerException(
+ "some exception thrown",
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+
+ try (ThrowingCheckpointStatsHandler checkpointStatsHandler =
+ new ThrowingCheckpointStatsHandler(
+ leaderRetriever,
+ TIMEOUT,
+ Collections.emptyMap(),
+ CheckpointingStatisticsHeaders.getInstance(),
+ CacheBuilder.newBuilder().build(),
+ Executors.directExecutor(),
+ restHandlerException)) {
+ RestfulGateway restfulGateway =
+ new TestingRestfulGateway.Builder()
+ .setRequestCheckpointStatsSnapshotFunction(
+ jobID ->
+ CompletableFuture.completedFuture(
+ checkpointStatsSnapshot1))
+ .build();
+ HandlerRequest<EmptyRequestBody> request =
+ HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobMessageParameters(),
+ Collections.singletonMap(JobIDPathParameter.KEY,
JOB_ID.toString()),
+ Collections.emptyMap(),
+ Collections.emptyList());
+
+ assertThatExceptionOfType(ExecutionException.class)
+ .isThrownBy(
+ () ->
+ checkpointStatsHandler
+ .handleRequest(request,
restfulGateway)
+ .get())
+ .withCause(restHandlerException);
+ }
+ }
+
+ @Test
+ public void testFlinkJobNotFoundException() throws Exception {
+ GatewayRetriever<RestfulGateway> leaderRetriever =
+ () -> CompletableFuture.completedFuture(null);
+ CheckpointStatsSnapshot checkpointStatsSnapshot1 =
getTestCheckpointStatsSnapshot();
+ CompletableFuture<CheckpointStatsSnapshot> failedFuture = new
CompletableFuture<>();
+ failedFuture.completeExceptionally(new
FlinkJobNotFoundException(JOB_ID));
+
+ try (RecordingCheckpointStatsHandler checkpointStatsHandler =
+ new RecordingCheckpointStatsHandler(
+ leaderRetriever,
+ TIMEOUT,
+ Collections.emptyMap(),
+ CheckpointingStatisticsHeaders.getInstance(),
+ CacheBuilder.newBuilder().build(),
+ Executors.directExecutor(),
+ null)) {
+ RestfulGateway restfulGateway =
+ new TestingRestfulGateway.Builder()
+ .setRequestCheckpointStatsSnapshotFunction(jobID
-> failedFuture)
+ .build();
+ HandlerRequest<EmptyRequestBody> request =
+ HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobMessageParameters(),
+ Collections.singletonMap(JobIDPathParameter.KEY,
JOB_ID.toString()),
+ Collections.emptyMap(),
+ Collections.emptyList());
+
+ assertThat(checkpointStatsHandler.handleRequest(request,
restfulGateway))
+ .isCompletedExceptionally()
+ .failsWithin(Duration.ofSeconds(1))
+ .withThrowableOfType(ExecutionException.class)
+ .withStackTraceContaining("Job %s not found", JOB_ID);
+ }
+ }
+
+ private static CheckpointStatsSnapshot getTestCheckpointStatsSnapshot() {
+ return checkpointStatsTracker.createSnapshot();
+ }
+
+ private CheckpointingStatistics getTestCheckpointingStatistics() {
+ final CheckpointingStatistics.Counts counts =
+ new CheckpointingStatistics.Counts(1, 2, 3, 4, 5);
+ final CheckpointingStatistics.Summary summary =
+ new CheckpointingStatistics.Summary(
+ new StatsSummaryDto(1L, 1L, 1L, 0, 0, 0, 0, 0),
+ new StatsSummaryDto(1L, 1L, 1L, 0, 0, 0, 0, 0),
+ new StatsSummaryDto(2L, 2L, 2L, 0, 0, 0, 0, 0),
+ new StatsSummaryDto(3L, 3L, 3L, 0, 0, 0, 0, 0),
+ new StatsSummaryDto(4L, 4L, 4L, 0, 0, 0, 0, 0),
+ new StatsSummaryDto(5L, 5L, 5L, 0, 0, 0, 0, 0));
+ return new CheckpointingStatistics(
+ counts,
+ summary,
+ new CheckpointingStatistics.LatestCheckpoints(null, null,
null, null),
+ Collections.emptyList());
+ }
+
+ private static class RecordingCheckpointStatsHandler
+ extends AbstractCheckpointStatsHandler<CheckpointingStatistics,
JobMessageParameters> {
+
+ private final CheckpointingStatistics returnValue;
+ private CheckpointStatsSnapshot storedCheckpointStats;
+
+ protected RecordingCheckpointStatsHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, CheckpointingStatistics,
JobMessageParameters>
+ messageHeaders,
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ checkpointStatsSnapshotCache,
+ Executor executor,
+ CheckpointingStatistics returnValue) {
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ checkpointStatsSnapshotCache,
+ executor);
+ this.returnValue = returnValue;
+ }
+
+ @Override
+ protected CheckpointingStatistics handleCheckpointStatsRequest(
+ HandlerRequest<EmptyRequestBody> request,
+ CheckpointStatsSnapshot checkpointStatsSnapshot)
+ throws RestHandlerException {
+ storedCheckpointStats = checkpointStatsSnapshot;
+ return returnValue;
+ }
+
+ public CheckpointStatsSnapshot getStoredCheckpointStats() {
+ return storedCheckpointStats;
+ }
+ }
+
+ private static class ThrowingCheckpointStatsHandler
+ extends AbstractCheckpointStatsHandler<CheckpointingStatistics,
JobMessageParameters> {
+
+ private final RestHandlerException exception;
+
+ protected ThrowingCheckpointStatsHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, CheckpointingStatistics,
JobMessageParameters>
+ messageHeaders,
+ Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ checkpointStatsSnapshotCache,
+ Executor executor,
+ RestHandlerException exception) {
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ checkpointStatsSnapshotCache,
+ executor);
+ this.exception = exception;
+ }
+
+ @Override
+ protected CheckpointingStatistics handleCheckpointStatsRequest(
+ HandlerRequest<EmptyRequestBody> request,
+ CheckpointStatsSnapshot checkpointStatsSnapshot)
+ throws RestHandlerException {
+ throw exception;
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 9b774b7ab5a..801689a3aa7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -144,6 +145,12 @@ public class TestingSchedulerNG implements SchedulerNG {
return null;
}
+ @Override
+ public CheckpointStatsSnapshot requestCheckpointStats() {
+ failOperation();
+ return null;
+ }
+
@Override
public JobStatus requestJobStatus() {
return JobStatus.CREATED;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index 7136620a907..08b988e2d0f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -120,6 +121,8 @@ public final class TestingDispatcherGateway extends
TestingRestfulGateway
Function<JobID, CompletableFuture<ArchivedExecutionGraph>>
requestJobFunction,
Function<JobID, CompletableFuture<ExecutionGraphInfo>>
requestExecutionGraphInfoFunction,
+ Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ requestCheckpointStatsSnapshotFunction,
Function<JobID, CompletableFuture<JobResult>>
requestJobResultFunction,
Function<JobID, CompletableFuture<JobStatus>>
requestJobStatusFunction,
Supplier<CompletableFuture<MultipleJobsDetails>>
requestMultipleJobDetailsSupplier,
@@ -173,6 +176,7 @@ public final class TestingDispatcherGateway extends
TestingRestfulGateway
cancelJobFunction,
requestJobFunction,
requestExecutionGraphInfoFunction,
+ requestCheckpointStatsSnapshotFunction,
requestJobResultFunction,
requestJobStatusFunction,
requestMultipleJobDetailsSupplier,
@@ -355,6 +359,7 @@ public final class TestingDispatcherGateway extends
TestingRestfulGateway
cancelJobFunction,
requestJobFunction,
requestExecutionGraphInfoFunction,
+ requestCheckpointStatsSnapshotFunction,
requestJobResultFunction,
requestJobStatusFunction,
requestMultipleJobDetailsSupplier,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 997a1fe6865..51270543b39 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -65,6 +66,10 @@ public class TestingRestfulGateway implements RestfulGateway
{
DEFAULT_REQUEST_EXECUTION_GRAPH_INFO =
jobId ->
FutureUtils.completedExceptionally(new
UnsupportedOperationException());
+ static final Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ DEFAULT_REQUEST_CHECKPOINT_STATS_SNAPSHOT =
+ jobId ->
+ FutureUtils.completedExceptionally(new
UnsupportedOperationException());
static final Function<JobID, CompletableFuture<JobStatus>>
DEFAULT_REQUEST_JOB_STATUS_FUNCTION =
jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING);
static final Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -148,6 +153,9 @@ public class TestingRestfulGateway implements
RestfulGateway {
protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
requestExecutionGraphInfoFunction;
+ protected Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ requestCheckpointStatsSnapshotFunction;
+
protected Function<JobID, CompletableFuture<JobResult>>
requestJobResultFunction;
protected Function<JobID, CompletableFuture<JobStatus>>
requestJobStatusFunction;
@@ -202,6 +210,7 @@ public class TestingRestfulGateway implements
RestfulGateway {
DEFAULT_CANCEL_JOB_FUNCTION,
DEFAULT_REQUEST_JOB_FUNCTION,
DEFAULT_REQUEST_EXECUTION_GRAPH_INFO,
+ DEFAULT_REQUEST_CHECKPOINT_STATS_SNAPSHOT,
DEFAULT_REQUEST_JOB_RESULT_FUNCTION,
DEFAULT_REQUEST_JOB_STATUS_FUNCTION,
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER,
@@ -225,6 +234,8 @@ public class TestingRestfulGateway implements
RestfulGateway {
Function<JobID, CompletableFuture<ArchivedExecutionGraph>>
requestJobFunction,
Function<JobID, CompletableFuture<ExecutionGraphInfo>>
requestExecutionGraphInfoFunction,
+ Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ requestCheckpointStatsSnapshotFunction,
Function<JobID, CompletableFuture<JobResult>>
requestJobResultFunction,
Function<JobID, CompletableFuture<JobStatus>>
requestJobStatusFunction,
Supplier<CompletableFuture<MultipleJobsDetails>>
requestMultipleJobDetailsSupplier,
@@ -264,6 +275,7 @@ public class TestingRestfulGateway implements
RestfulGateway {
this.cancelJobFunction = cancelJobFunction;
this.requestJobFunction = requestJobFunction;
this.requestExecutionGraphInfoFunction =
requestExecutionGraphInfoFunction;
+ this.requestCheckpointStatsSnapshotFunction =
requestCheckpointStatsSnapshotFunction;
this.requestJobResultFunction = requestJobResultFunction;
this.requestJobStatusFunction = requestJobStatusFunction;
this.requestMultipleJobDetailsSupplier =
requestMultipleJobDetailsSupplier;
@@ -304,6 +316,12 @@ public class TestingRestfulGateway implements
RestfulGateway {
return requestExecutionGraphInfoFunction.apply(jobId);
}
+ @Override
+ public CompletableFuture<CheckpointStatsSnapshot> requestCheckpointStats(
+ JobID jobId, Time timeout) {
+ return requestCheckpointStatsSnapshotFunction.apply(jobId);
+ }
+
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time
timeout) {
return requestJobResultFunction.apply(jobId);
@@ -410,6 +428,8 @@ public class TestingRestfulGateway implements
RestfulGateway {
protected Function<JobID, CompletableFuture<ArchivedExecutionGraph>>
requestJobFunction;
protected Function<JobID, CompletableFuture<ExecutionGraphInfo>>
requestExecutionGraphInfoFunction;
+ protected Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ requestCheckpointStatsSnapshotFunction;
protected Function<JobID, CompletableFuture<JobResult>>
requestJobResultFunction;
protected Function<JobID, CompletableFuture<JobStatus>>
requestJobStatusFunction;
protected Supplier<CompletableFuture<MultipleJobsDetails>>
@@ -452,6 +472,7 @@ public class TestingRestfulGateway implements
RestfulGateway {
cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
requestExecutionGraphInfoFunction =
DEFAULT_REQUEST_EXECUTION_GRAPH_INFO;
+ requestCheckpointStatsSnapshotFunction =
DEFAULT_REQUEST_CHECKPOINT_STATS_SNAPSHOT;
requestJobResultFunction = DEFAULT_REQUEST_JOB_RESULT_FUNCTION;
requestJobStatusFunction = DEFAULT_REQUEST_JOB_STATUS_FUNCTION;
requestMultipleJobDetailsSupplier =
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER;
@@ -499,6 +520,13 @@ public class TestingRestfulGateway implements
RestfulGateway {
return self();
}
+ public T setRequestCheckpointStatsSnapshotFunction(
+ Function<JobID, CompletableFuture<CheckpointStatsSnapshot>>
+ requestCheckpointStatsSnapshotFunction) {
+ this.requestCheckpointStatsSnapshotFunction =
requestCheckpointStatsSnapshotFunction;
+ return self();
+ }
+
public T setRequestJobResultFunction(
Function<JobID, CompletableFuture<JobResult>>
requestJobResultFunction) {
this.requestJobResultFunction = requestJobResultFunction;
@@ -625,6 +653,7 @@ public class TestingRestfulGateway implements
RestfulGateway {
cancelJobFunction,
requestJobFunction,
requestExecutionGraphInfoFunction,
+ requestCheckpointStatsSnapshotFunction,
requestJobResultFunction,
requestJobStatusFunction,
requestMultipleJobDetailsSupplier,