[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint Disable failing when not all creator properties are known
Move CheckpointStatsCache out of legacy package; Remove unused CheckpointingStatistics#generateCheckpointStatistics method Remove JsonInclude.Include.NON_NULL from CheckpointStatistics; Pull null check out of CheckpointStatistics#generateCheckpointStatistics; Make CheckpointStatistics#checkpointStatisticcsPerTask non nullable; Add fail on missing creator property This closes #4763. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a286d0f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a286d0f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a286d0f Branch: refs/heads/master Commit: 0a286d0ff98afa68034daff4634f526eaaf97897 Parents: 6b3fdc2 Author: Till Rohrmann <[email protected]> Authored: Mon Oct 2 19:39:38 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue Oct 10 17:34:14 2017 +0200 ---------------------------------------------------------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 2 +- .../dispatcher/DispatcherRestEndpoint.java | 29 +- .../rest/handler/RestHandlerConfiguration.java | 18 +- .../job/AbstractExecutionGraphHandler.java | 10 +- .../rest/handler/job/JobConfigHandler.java | 5 +- .../checkpoints/AbstractCheckpointHandler.java | 91 +++ .../checkpoints/CheckpointConfigHandler.java | 7 +- .../CheckpointStatisticDetailsHandler.java | 54 ++ .../CheckpointStatisticsHandler.java | 181 ----- .../job/checkpoints/CheckpointStatsCache.java | 81 ++ .../CheckpointingStatisticsHandler.java | 153 ++++ .../checkpoints/CheckpointConfigHandler.java | 2 +- .../checkpoints/CheckpointStatsCache.java | 81 -- .../CheckpointStatsDetailsHandler.java | 1 + .../CheckpointStatsDetailsSubtasksHandler.java | 1 + .../checkpoints/CheckpointStatsHandler.java | 81 +- .../rest/messages/CheckpointConfigHeaders.java | 70 -- .../rest/messages/CheckpointConfigInfo.java | 151 ---- .../rest/messages/CheckpointStatistics.java | 763 ------------------- .../messages/CheckpointStatisticsHeaders.java | 68 -- .../rest/messages/JobMessageParameters.java | 2 +- .../checkpoints/CheckpointConfigHeaders.java | 73 ++ .../checkpoints/CheckpointConfigInfo.java | 152 ++++ .../checkpoints/CheckpointIdPathParameter.java | 48 ++ .../CheckpointMessageParameters.java | 38 + .../CheckpointStatisticDetailsHeaders.java | 72 ++ .../checkpoints/CheckpointStatistics.java | 537 +++++++++++++ .../checkpoints/CheckpointingStatistics.java | 478 ++++++++++++ .../CheckpointingStatisticsHeaders.java | 71 ++ .../messages/json/JobVertexIDDeserializer.java | 37 + .../messages/json/JobVertexIDSerializer.java | 44 ++ .../checkpoints/CheckpointStatsCacheTest.java | 1 + .../CheckpointStatsDetailsHandlerTest.java | 1 + ...heckpointStatsSubtaskDetailsHandlerTest.java | 1 + .../messages/CheckpointConfigInfoTest.java | 2 +- .../messages/CheckpointStatisticsTest.java | 104 --- .../messages/CheckpointingStatisticsTest.java | 134 ++++ 37 files changed, 2164 insertions(+), 1480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 0bf6552..1a6178f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.WebHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; @@ -58,7 +59,6 @@ import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler; http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index d64e649..2a2d9be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -32,7 +32,9 @@ import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; @@ -43,8 +45,6 @@ import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo; import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; -import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders; -import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders; import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders; @@ -52,6 +52,9 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.JobConfigHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.FileUtils; @@ -78,6 +81,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { private final Executor executor; private final ExecutionGraphCache executionGraphCache; + private final CheckpointStatsCache checkpointStatsCache; public DispatcherRestEndpoint( RestServerEndpointConfiguration endpointConfiguration, @@ -94,6 +98,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { this.executionGraphCache = new ExecutionGraphCache( restConfiguration.getTimeout(), Time.milliseconds(restConfiguration.getRefreshInterval())); + + this.checkpointStatsCache = new CheckpointStatsCache( + restConfiguration.getMaxCheckpointStatisticCacheEntries()); } @Override @@ -162,14 +169,23 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { executionGraphCache, executor); - CheckpointStatisticsHandler checkpointStatisticsHandler = new CheckpointStatisticsHandler( + CheckpointingStatisticsHandler checkpointStatisticsHandler = new CheckpointingStatisticsHandler( restAddressFuture, leaderRetriever, timeout, - CheckpointStatisticsHeaders.getInstance(), + CheckpointingStatisticsHeaders.getInstance(), executionGraphCache, executor); + CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler( + restAddressFuture, + leaderRetriever, + timeout, + CheckpointStatisticDetailsHeaders.getInstance(), + executionGraphCache, + executor, + checkpointStatsCache); + final File tmpDir = restConfiguration.getTmpDir(); Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; @@ -192,7 +208,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler)); handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler)); handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler)); - handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); + handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); + handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler)); BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java index 9220bd9..0344597 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java @@ -32,14 +32,22 @@ public class RestHandlerConfiguration { private final long refreshInterval; + private final int maxCheckpointStatisticCacheEntries; + private final Time timeout; private final File tmpDir; - public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) { + public RestHandlerConfiguration( + long refreshInterval, + int maxCheckpointStatisticCacheEntries, + Time timeout, + File tmpDir) { Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0."); this.refreshInterval = refreshInterval; + this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries; + this.timeout = Preconditions.checkNotNull(timeout); this.tmpDir = Preconditions.checkNotNull(tmpDir); } @@ -48,6 +56,10 @@ public class RestHandlerConfiguration { return refreshInterval; } + public int getMaxCheckpointStatisticCacheEntries() { + return maxCheckpointStatisticCacheEntries; + } + public Time getTimeout() { return timeout; } @@ -59,10 +71,12 @@ public class RestHandlerConfiguration { public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL); + final int maxCheckpointStatisticCacheEntries = configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); + final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR)); - return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir); + return new RestHandlerConfiguration(refreshInterval, maxCheckpointStatisticCacheEntries, timeout, tmpDir); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java index f2b1ac8..5348b55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java @@ -45,7 +45,7 @@ import java.util.concurrent.Executor; * * @param <R> response type */ -public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, JobMessageParameters> { +public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M extends JobMessageParameters> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, M> { private final ExecutionGraphCache executionGraphCache; @@ -55,7 +55,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, - MessageHeaders<EmptyRequestBody, R, JobMessageParameters> messageHeaders, + MessageHeaders<EmptyRequestBody, R, M> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { super(localRestAddress, leaderRetriever, timeout, messageHeaders); @@ -65,7 +65,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte } @Override - protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, M> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { JobID jobId = request.getPathParameter(JobIDPathParameter.class); CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, gateway); @@ -73,7 +73,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte return executionGraphFuture.thenApplyAsync( executionGraph -> { try { - return handleRequest(executionGraph); + return handleRequest(request, executionGraph); } catch (RestHandlerException rhe) { throw new CompletionException(rhe); } @@ -81,5 +81,5 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte executor); } - protected abstract R handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException; + protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph) throws RestHandlerException; } http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java index bbe4eef..f27d84f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobConfigInfo; @@ -35,7 +36,7 @@ import java.util.concurrent.Executor; /** * Handler serving the job configuration. */ -public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo> { +public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> { public JobConfigHandler( CompletableFuture<String> localRestAddress, @@ -55,7 +56,7 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf } @Override - protected JobConfigInfo handleRequest(AccessExecutionGraph executionGraph) { + protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) { final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig(); final JobConfigInfo.ExecutionConfigInfo executionConfigInfo; http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java new file mode 100644 index 0000000..62ed1a4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for checkpoint related REST handler. + * + * @param <R> type of the response + */ +public abstract class AbstractCheckpointHandler<R extends ResponseBody> extends AbstractExecutionGraphHandler<R, CheckpointMessageParameters> { + + private final CheckpointStatsCache checkpointStatsCache; + + protected AbstractCheckpointHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + MessageHeaders<EmptyRequestBody, R, CheckpointMessageParameters> messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + CheckpointStatsCache checkpointStatsCache) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + this.checkpointStatsCache = Preconditions.checkNotNull(checkpointStatsCache); + } + + @Override + protected R handleRequest(HandlerRequest<EmptyRequestBody, CheckpointMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { + final long checkpointId = request.getPathParameter(CheckpointIdPathParameter.class); + + final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot(); + + if (checkpointStatsSnapshot != null) { + AbstractCheckpointStats checkpointStats = checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId); + + if (checkpointStats != null) { + checkpointStatsCache.tryAdd(checkpointStats); + } else { + checkpointStats = checkpointStatsCache.tryGet(checkpointId); + } + + if (checkpointStats != null) { + return handleCheckpointRequest(checkpointStats); + } else { + throw new RestHandlerException("Could not find checkpointing statistics for checkpoint " + checkpointId + '.', HttpResponseStatus.NOT_FOUND); + } + } else { + throw new RestHandlerException("Checkpointing was not enabled for job " + executionGraph.getJobID() + '.', HttpResponseStatus.NOT_FOUND); + } + } + + protected abstract R handleCheckpointRequest(AbstractCheckpointStats checkpointStats); +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java index 94646eb..1efa7af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java @@ -22,13 +22,14 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; -import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -40,7 +41,7 @@ import java.util.concurrent.Executor; /** * Handler which serves the checkpoint configuration. */ -public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo> { +public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> { public CheckpointConfigHandler( CompletableFuture<String> localRestAddress, @@ -59,7 +60,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check } @Override - protected CheckpointConfigInfo handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException { + protected CheckpointConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration(); if (checkpointCoordinatorConfiguration == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java new file mode 100644 index 0000000..2fc3008 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * REST handler which returns the details for a checkpoint. + */ +public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics> { + + public CheckpointStatisticDetailsHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + CheckpointStatsCache checkpointStatsCache) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache); + } + + @Override + protected CheckpointStatistics handleCheckpointRequest(AbstractCheckpointStats checkpointStats) { + return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java deleted file mode 100644 index 21ded78..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.job.checkpoints; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; -import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; -import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; -import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; -import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; -import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.rest.handler.RestHandlerException; -import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; -import org.apache.flink.runtime.rest.messages.CheckpointStatistics; -import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.JobMessageParameters; -import org.apache.flink.runtime.rest.messages.MessageHeaders; -import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Handler which serves the checkpoint statistics. - */ -public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointStatistics> { - - public CheckpointStatisticsHandler( - CompletableFuture<String> localRestAddress, - GatewayRetriever<? extends RestfulGateway> leaderRetriever, - Time timeout, - MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> messageHeaders, - ExecutionGraphCache executionGraphCache, - Executor executor) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); - } - - @Override - protected CheckpointStatistics handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException { - - final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot(); - - if (checkpointStatsSnapshot == null) { - throw new RestHandlerException("Checkpointing has not been enabled.", HttpResponseStatus.NOT_FOUND); - } else { - final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts(); - - final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts( - checkpointStatsCounts.getNumberOfRestoredCheckpoints(), - checkpointStatsCounts.getTotalNumberOfCheckpoints(), - checkpointStatsCounts.getNumberOfInProgressCheckpoints(), - checkpointStatsCounts.getNumberOfCompletedCheckpoints(), - checkpointStatsCounts.getNumberOfFailedCheckpoints()); - - final CompletedCheckpointStatsSummary checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats(); - final MinMaxAvgStats stateSize = checkpointStatsSummary.getStateSizeStats(); - final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats(); - final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats(); - - final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary( - new CheckpointStatistics.MinMaxAvgStatistics( - stateSize.getMinimum(), - stateSize.getMaximum(), - stateSize.getAverage()), - new CheckpointStatistics.MinMaxAvgStatistics( - duration.getMinimum(), - duration.getMaximum(), - duration.getAverage()), - new CheckpointStatistics.MinMaxAvgStatistics( - alignment.getMinimum(), - alignment.getMaximum(), - alignment.getAverage())); - - final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory(); - - final CheckpointStatistics.CompletedCheckpointStatistics completed = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestCompletedCheckpoint()); - final CheckpointStatistics.CompletedCheckpointStatistics savepoint = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestSavepoint()); - final CheckpointStatistics.FailedCheckpointStatistics failed = (CheckpointStatistics.FailedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestFailedCheckpoint()); - - final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint(); - - final CheckpointStatistics.RestoredCheckpointStatistics restored; - - if (restoredCheckpointStats == null) { - restored = null; - } else { - restored = new CheckpointStatistics.RestoredCheckpointStatistics( - restoredCheckpointStats.getCheckpointId(), - restoredCheckpointStats.getRestoreTimestamp(), - restoredCheckpointStats.getProperties().isSavepoint(), - restoredCheckpointStats.getExternalPath()); - } - - final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints( - completed, - savepoint, - failed, - restored); - - final List<CheckpointStatistics.BaseCheckpointStatistics> history = new ArrayList<>(16); - - for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) { - history.add(generateCheckpointStatistics(abstractCheckpointStats)); - } - - return new CheckpointStatistics( - counts, - summary, - latestCheckpoints, - history); - } - } - - private static CheckpointStatistics.BaseCheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats) { - if (checkpointStats != null) { - if (checkpointStats instanceof CompletedCheckpointStats) { - final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats); - - return new CheckpointStatistics.CompletedCheckpointStatistics( - completedCheckpointStats.getCheckpointId(), - completedCheckpointStats.getStatus(), - completedCheckpointStats.getProperties().isSavepoint(), - completedCheckpointStats.getTriggerTimestamp(), - completedCheckpointStats.getLatestAckTimestamp(), - completedCheckpointStats.getStateSize(), - completedCheckpointStats.getEndToEndDuration(), - completedCheckpointStats.getAlignmentBuffered(), - completedCheckpointStats.getNumberOfSubtasks(), - completedCheckpointStats.getNumberOfAcknowledgedSubtasks(), - completedCheckpointStats.getExternalPath(), - completedCheckpointStats.isDiscarded()); - } else if (checkpointStats instanceof FailedCheckpointStats) { - final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats); - - return new CheckpointStatistics.FailedCheckpointStatistics( - failedCheckpointStats.getCheckpointId(), - failedCheckpointStats.getStatus(), - failedCheckpointStats.getProperties().isSavepoint(), - failedCheckpointStats.getTriggerTimestamp(), - failedCheckpointStats.getLatestAckTimestamp(), - failedCheckpointStats.getStateSize(), - failedCheckpointStats.getEndToEndDuration(), - failedCheckpointStats.getAlignmentBuffered(), - failedCheckpointStats.getNumberOfSubtasks(), - failedCheckpointStats.getNumberOfAcknowledgedSubtasks(), - failedCheckpointStats.getFailureTimestamp(), - failedCheckpointStats.getFailureMessage()); - } else { - throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted."); - } - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java new file mode 100644 index 0000000..dcd36b0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +/** + * A size-based cache of accessed checkpoints for completed and failed + * checkpoints. + * + * <p>Having this cache in place for accessed stats improves the user + * experience quite a bit as accessed checkpoint stats stay available + * and don't expire. For example if you manage to click on the last + * checkpoint in the history, it is not available via the stats as soon + * as another checkpoint is triggered. With the cache in place, the + * checkpoint will still be available for investigation. + */ +public class CheckpointStatsCache { + + @Nullable + private final Cache<Long, AbstractCheckpointStats> cache; + + public CheckpointStatsCache(int maxNumEntries) { + if (maxNumEntries > 0) { + this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder() + .maximumSize(maxNumEntries) + .build(); + } else { + this.cache = null; + } + } + + /** + * Try to add the checkpoint to the cache. + * + * @param checkpoint Checkpoint to be added. + */ + public void tryAdd(AbstractCheckpointStats checkpoint) { + // Don't add in progress checkpoints as they will be replaced by their + // completed/failed version eventually. + if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) { + cache.put(checkpoint.getCheckpointId(), checkpoint); + } + } + + /** + * Try to look up a checkpoint by it's ID in the cache. + * + * @param checkpointId ID of the checkpoint to look up. + * @return The checkpoint or <code>null</code> if checkpoint not found. + */ + public AbstractCheckpointStats tryGet(long checkpointId) { + if (cache != null) { + return cache.getIfPresent(checkpointId); + } else { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java new file mode 100644 index 0000000..1c5762e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; +import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; +import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Handler which serves the checkpoint statistics. + */ +public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> { + + public CheckpointingStatisticsHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + } + + @Override + protected CheckpointingStatistics handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { + + final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot(); + + if (checkpointStatsSnapshot == null) { + throw new RestHandlerException("Checkpointing has not been enabled.", HttpResponseStatus.NOT_FOUND); + } else { + final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts(); + + final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts( + checkpointStatsCounts.getNumberOfRestoredCheckpoints(), + checkpointStatsCounts.getTotalNumberOfCheckpoints(), + checkpointStatsCounts.getNumberOfInProgressCheckpoints(), + checkpointStatsCounts.getNumberOfCompletedCheckpoints(), + checkpointStatsCounts.getNumberOfFailedCheckpoints()); + + final CompletedCheckpointStatsSummary checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats(); + final MinMaxAvgStats stateSize = checkpointStatsSummary.getStateSizeStats(); + final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats(); + final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats(); + + final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary( + new CheckpointingStatistics.MinMaxAvgStatistics( + stateSize.getMinimum(), + stateSize.getMaximum(), + stateSize.getAverage()), + new CheckpointingStatistics.MinMaxAvgStatistics( + duration.getMinimum(), + duration.getMaximum(), + duration.getAverage()), + new CheckpointingStatistics.MinMaxAvgStatistics( + alignment.getMinimum(), + alignment.getMaximum(), + alignment.getAverage())); + + final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory(); + + final CheckpointStatistics.CompletedCheckpointStatistics completed = checkpointStatsHistory.getLatestCompletedCheckpoint() != null ? + (CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics( + checkpointStatsHistory.getLatestCompletedCheckpoint(), + false) : + null; + + final CheckpointStatistics.CompletedCheckpointStatistics savepoint = checkpointStatsHistory.getLatestSavepoint() != null ? + (CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics( + checkpointStatsHistory.getLatestSavepoint(), + false) : + null; + + final CheckpointStatistics.FailedCheckpointStatistics failed = checkpointStatsHistory.getLatestFailedCheckpoint() != null ? + (CheckpointStatistics.FailedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics( + checkpointStatsHistory.getLatestFailedCheckpoint(), + false) : + null; + + final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint(); + + final CheckpointingStatistics.RestoredCheckpointStatistics restored; + + if (restoredCheckpointStats == null) { + restored = null; + } else { + restored = new CheckpointingStatistics.RestoredCheckpointStatistics( + restoredCheckpointStats.getCheckpointId(), + restoredCheckpointStats.getRestoreTimestamp(), + restoredCheckpointStats.getProperties().isSavepoint(), + restoredCheckpointStats.getExternalPath()); + } + + final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints( + completed, + savepoint, + failed, + restored); + + final List<CheckpointStatistics> history = new ArrayList<>(16); + + for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) { + history.add(CheckpointStatistics.generateCheckpointStatistics(abstractCheckpointStats, false)); + } + + return new CheckpointingStatistics( + counts, + summary, + latestCheckpoints, + history); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java index f50c42d..60b9799 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; -import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.FlinkException; http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java deleted file mode 100644 index f21fc76..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.legacy.checkpoints; - -import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; - -import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; - -import javax.annotation.Nullable; - -/** - * A size-based cache of accessed checkpoints for completed and failed - * checkpoints. - * - * <p>Having this cache in place for accessed stats improves the user - * experience quite a bit as accessed checkpoint stats stay available - * and don't expire. For example if you manage to click on the last - * checkpoint in the history, it is not available via the stats as soon - * as another checkpoint is triggered. With the cache in place, the - * checkpoint will still be available for investigation. - */ -public class CheckpointStatsCache { - - @Nullable - private final Cache<Long, AbstractCheckpointStats> cache; - - public CheckpointStatsCache(int maxNumEntries) { - if (maxNumEntries > 0) { - this.cache = CacheBuilder.<Long, AbstractCheckpointStats>newBuilder() - .maximumSize(maxNumEntries) - .build(); - } else { - this.cache = null; - } - } - - /** - * Try to add the checkpoint to the cache. - * - * @param checkpoint Checkpoint to be added. - */ - void tryAdd(AbstractCheckpointStats checkpoint) { - // Don't add in progress checkpoints as they will be replaced by their - // completed/failed version eventually. - if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) { - cache.put(checkpoint.getCheckpointId(), checkpoint); - } - } - - /** - * Try to look up a checkpoint by it's ID in the cache. - * - * @param checkpointId ID of the checkpoint to look up. - * @return The checkpoint or <code>null</code> if checkpoint not found. - */ - AbstractCheckpointStats tryGet(long checkpointId) { - if (cache != null) { - return cache.getIfPresent(checkpointId); - } else { - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java index e277971..dce1641 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index 5420cf4..1421fb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java index 5b35c7f..b6c86be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java @@ -31,7 +31,8 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; -import org.apache.flink.runtime.rest.messages.CheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.FlinkException; @@ -129,37 +130,37 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler } private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException { - gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_COUNTS); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints()); + gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_COUNTS); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints()); gen.writeEndObject(); } private static void writeSummary( JsonGenerator gen, CompletedCheckpointStatsSummary summary) throws IOException { - gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_SUMMARY); - gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_STATE_SIZE); + gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_SUMMARY); + gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_STATE_SIZE); writeMinMaxAvg(gen, summary.getStateSizeStats()); gen.writeEndObject(); - gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_DURATION); + gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_DURATION); writeMinMaxAvg(gen, summary.getEndToEndDurationStats()); gen.writeEndObject(); - gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED); + gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED); writeMinMaxAvg(gen, summary.getAlignmentBufferedStats()); gen.writeEndObject(); gen.writeEndObject(); } static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { - gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum()); - gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum()); - gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage()); + gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum()); + gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum()); + gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage()); } private static void writeLatestCheckpoints( @@ -169,10 +170,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler @Nullable FailedCheckpointStats failed, @Nullable RestoredCheckpointStats restored) throws IOException { - gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_LATEST_CHECKPOINTS); + gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS); // Completed checkpoint if (completed != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED); writeCheckpoint(gen, completed); String externalPath = completed.getExternalPath(); @@ -185,7 +186,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler // Completed savepoint if (savepoint != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT); writeCheckpoint(gen, savepoint); String externalPath = savepoint.getExternalPath(); @@ -197,7 +198,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler // Failed checkpoint if (failed != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_FAILED); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_FAILED); writeCheckpoint(gen, failed); gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP, failed.getFailureTimestamp()); @@ -210,14 +211,14 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler // Restored checkpoint if (restored != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_RESTORED); - gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId()); - gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp()); - gen.writeBooleanField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint()); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED); + gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId()); + gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp()); + gen.writeBooleanField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint()); String externalPath = restored.getExternalPath(); if (externalPath != null) { - gen.writeStringField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath); + gen.writeStringField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath); } gen.writeEndObject(); } @@ -225,29 +226,29 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler } private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException { - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); } private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException { - gen.writeArrayFieldStart(CheckpointStatistics.FIELD_NAME_HISTORY); + gen.writeArrayFieldStart(CheckpointingStatistics.FIELD_NAME_HISTORY); for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { gen.writeStartObject(); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); - gen.writeStringField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString()); - gen.writeBooleanField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); + gen.writeStringField(CheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString()); + gen.writeBooleanField(CheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks()); if (checkpoint.getStatus().isCompleted()) { // --- Completed --- http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java deleted file mode 100644 index bfc0b7a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import org.apache.flink.runtime.rest.HttpMethodWrapper; -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - -/** - * Message headers for the {@link CheckpointConfigHandler}. - */ -public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> { - - private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders(); - - public static final String URL = "/jobs/:jobid/checkpoints/config"; - - private CheckpointConfigHeaders() {} - - @Override - public Class<EmptyRequestBody> getRequestClass() { - return EmptyRequestBody.class; - } - - @Override - public Class<CheckpointConfigInfo> getResponseClass() { - return CheckpointConfigInfo.class; - } - - @Override - public HttpResponseStatus getResponseStatusCode() { - return HttpResponseStatus.OK; - } - - @Override - public JobMessageParameters getUnresolvedMessageParameters() { - return new JobMessageParameters(); - } - - @Override - public HttpMethodWrapper getHttpMethod() { - return HttpMethodWrapper.GET; - } - - @Override - public String getTargetRestEndpointURL() { - return URL; - } - - public static CheckpointConfigHeaders getInstance() { - return INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java deleted file mode 100644 index fbda12a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; -import org.apache.flink.util.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Objects; - -/** - * Response class of the {@link CheckpointConfigHandler}. - */ -public class CheckpointConfigInfo implements ResponseBody { - - public static final String FIELD_NAME_PROCESSING_MODE = "mode"; - - public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval"; - - public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout"; - - public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause"; - - public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent"; - - public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization"; - - @JsonProperty(FIELD_NAME_PROCESSING_MODE) - private final ProcessingMode processingMode; - - @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) - private final long checkpointInterval; - - @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) - private final long checkpointTimeout; - - @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) - private final long minPauseBetweenCheckpoints; - - @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) - private final long maxConcurrentCheckpoints; - - @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) - private final ExternalizedCheckpointInfo externalizedCheckpointInfo; - - @JsonCreator - public CheckpointConfigInfo( - @JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode, - @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval, - @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout, - @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints, - @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints, - @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) { - this.processingMode = Preconditions.checkNotNull(processingMode); - this.checkpointInterval = checkpointInterval; - this.checkpointTimeout = checkpointTimeout; - this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; - this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; - this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CheckpointConfigInfo that = (CheckpointConfigInfo) o; - return checkpointInterval == that.checkpointInterval && - checkpointTimeout == that.checkpointTimeout && - minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints && - maxConcurrentCheckpoints == that.maxConcurrentCheckpoints && - processingMode == that.processingMode && - Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo); - } - - @Override - public int hashCode() { - return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo); - } - - /** - * Contains information about the externalized checkpoint configuration. - */ - public static final class ExternalizedCheckpointInfo { - - public static final String FIELD_NAME_ENABLED = "enabled"; - - public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation"; - - @JsonProperty(FIELD_NAME_ENABLED) - private final boolean enabled; - - @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) - private final boolean deleteOnCancellation; - - @JsonCreator - public ExternalizedCheckpointInfo( - @JsonProperty(FIELD_NAME_ENABLED) boolean enabled, - @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) { - this.enabled = enabled; - this.deleteOnCancellation = deleteOnCancellation; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o; - return enabled == that.enabled && - deleteOnCancellation == that.deleteOnCancellation; - } - - @Override - public int hashCode() { - return Objects.hash(enabled, deleteOnCancellation); - } - } - - /** - * Processing mode. - */ - public enum ProcessingMode { - AT_LEAST_ONCE, - EXACTLY_ONCE - } -}
