Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5223#discussion_r161243072
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * <p>Drawing savepoints is a potentially long-running operation. To avoid
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * <p>A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON
body to specify the target
+ * directory of the savepoint, e.g.,
+ * <pre>
+ * { "target-directory": "/tmp" }
+ * </pre>
+ * If the body is omitted, or the field {@code target-property} is {@code
null}, the default
+ * savepoint directory as specified by {@link
CoreOptions#SAVEPOINT_DIRECTORY} will be used.
+ * As written above, the response will contain a request id, e.g.,
+ * <pre>
+ * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" }
+ * </pre>
+ *
+ * <p>To poll for the status of an ongoing savepoint, an HTTP {@code GET}
request is issued to
+ * {@code /jobs/:jobid/savepoints/:savepointtriggerid}. If the specified
savepoint is still ongoing,
+ * the response will be
+ * <pre>
+ * {
+ * "status": {
+ * "id": "IN_PROGRESS"
+ * }
+ * }
+ * </pre>
+ * If the specified savepoint has completed, the status id will transition
to {@code COMPLETED}, and
+ * the response will additionally contain information about the savepoint,
such as the location:
+ * <pre>
+ * {
+ * "status": {
+ * "id": "COMPLETED"
+ * },
+ * "savepoint": {
+ * "request-id": "7d273f5a62eb4730b9dea8e833733c1e",
+ * "location": "/tmp/savepoint-d9813b-8a68e674325b"
+ * }
+ * }
+ * </pre>
+ */
+public class SavepointHandlers {
+
+ private final CompletedCheckpointCache completedCheckpointCache = new
CompletedCheckpointCache();
+
+ @Nullable
+ private String defaultSavepointDir;
+
+ public SavepointHandlers(@Nullable final String defaultSavepointDir) {
+ this.defaultSavepointDir = defaultSavepointDir;
+ }
+
+ /**
+ * HTTP handler to trigger savepoints.
+ */
+ public class SavepointTriggerHandler
+ extends AbstractRestHandler<RestfulGateway,
SavepointTriggerRequestBody, SavepointTriggerResponseBody,
SavepointTriggerMessageParameters> {
+
+ public SavepointTriggerHandler(
+ final CompletableFuture<String>
localRestAddress,
+ final GatewayRetriever<? extends
RestfulGateway> leaderRetriever,
+ final Time timeout,
+ final Map<String, String> responseHeaders) {
+ super(localRestAddress, leaderRetriever, timeout,
responseHeaders, SavepointTriggerHeaders.getInstance());
+ }
+
+ @Override
+ protected CompletableFuture<SavepointTriggerResponseBody>
handleRequest(
+ @Nonnull final
HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters>
request,
+ @Nonnull final RestfulGateway gateway) throws
RestHandlerException {
+
+ final JobID jobId =
request.getPathParameter(JobIDPathParameter.class);
+ final String requestedTargetDirectory =
request.getRequestBody().getTargetDirectory();
+
+ if (requestedTargetDirectory == null &&
defaultSavepointDir == null) {
+ return FutureUtils.completedExceptionally(
+ new RestHandlerException(
+ String.format("Config key [%s]
is not set. Property [%s] must be provided.",
+
CoreOptions.SAVEPOINT_DIRECTORY.key(),
+
SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY),
+
HttpResponseStatus.BAD_REQUEST));
+ }
+
+ final String targetDirectory = requestedTargetDirectory
!= null ? requestedTargetDirectory : defaultSavepointDir;
+ final CompletableFuture<CompletedCheckpoint>
completedCheckpointCompletableFuture =
+ gateway.triggerSavepoint(jobId,
targetDirectory, RpcUtils.INF_TIMEOUT);
+ final SavepointTriggerId savepointTriggerId = new
SavepointTriggerId();
+ completedCheckpointCache.registerOngoingCheckpoint(
+ SavepointKey.of(savepointTriggerId, jobId),
+ completedCheckpointCompletableFuture);
+ return CompletableFuture.completedFuture(
+ new
SavepointTriggerResponseBody(savepointTriggerId));
+ }
+ }
+
+ /**
+ * HTTP handler to query for the status of the savepoint.
+ */
+ public class SavepointStatusHandler
+ extends AbstractRestHandler<RestfulGateway,
EmptyRequestBody, SavepointResponseBody, SavepointStatusMessageParameters> {
+
+ public SavepointStatusHandler(
+ final CompletableFuture<String>
localRestAddress,
+ final GatewayRetriever<? extends
RestfulGateway> leaderRetriever,
+ final Time timeout,
+ final Map<String, String> responseHeaders) {
+ super(localRestAddress, leaderRetriever, timeout,
responseHeaders, SavepointStatusHeaders.getInstance());
+ }
+
+ @Override
+ protected CompletableFuture<SavepointResponseBody>
handleRequest(
+ @Nonnull final HandlerRequest<EmptyRequestBody,
SavepointStatusMessageParameters> request,
+ @Nonnull final RestfulGateway gateway) throws
RestHandlerException {
+
+ final JobID jobId =
request.getPathParameter(JobIDPathParameter.class);
+ final SavepointTriggerId savepointTriggerId =
request.getPathParameter(
+ SavepointTriggerIdPathParameter.class);
+ final Either<Throwable, CompletedCheckpoint>
completedCheckpointOrError;
+ try {
+ completedCheckpointOrError =
completedCheckpointCache.get(SavepointKey.of(
+ savepointTriggerId, jobId));
+ } catch (UnknownSavepointTriggerId e) {
+ return FutureUtils.completedExceptionally(
+ new NotFoundException("Savepoint not
found. Savepoint trigger id: " +
+ savepointTriggerId + ", job id:
" + jobId));
+ }
+
+ if (completedCheckpointOrError != null) {
+ if (completedCheckpointOrError.isLeft()) {
+ return
CompletableFuture.completedFuture(new SavepointResponseBody(
+ QueueStatus.completed(),
+ new
SavepointInfo(savepointTriggerId, null, new SerializedThrowable(
+
completedCheckpointOrError.left()))));
+ } else {
+ final CompletedCheckpoint
completedCheckpoint = completedCheckpointOrError.right();
+ final String externalPointer =
completedCheckpoint.getExternalPointer();
+ return
CompletableFuture.completedFuture(new SavepointResponseBody(
+ QueueStatus.completed(),
+ new
SavepointInfo(savepointTriggerId, externalPointer, null)));
+ }
+ } else {
+ return
CompletableFuture.completedFuture(SavepointResponseBody.inProgress());
+ }
+ }
+ }
+
+ /**
+ * Cache to manage ongoing checkpoints.
+ *
+ * <p>The cache allows to register an ongoing checkpoint in the form of
a
+ * {@code CompletableFuture<CompletedCheckpoint>}. Completed
checkpoints will be removed from
+ * the cache automatically after a fixed timeout.
+ */
+ @ThreadSafe
+ static class CompletedCheckpointCache {
+
+ private static final long
COMPLETED_CHECKPOINTS_CACHE_DURATION_SECONDS = 300;
+
+ /**
+ * Stores SavepointKeys of ongoing checkpoints.
+ * If the checkpoint completes, it will be moved to {@link
#completedCheckpoints}.
+ */
+ private final Set<SavepointKey> registeredSavepointTriggers =
ConcurrentHashMap.newKeySet();
+
+ /** Caches completed checkpoints. */
+ private final Cache<SavepointKey, Either<Throwable,
CompletedCheckpoint>> completedCheckpoints =
+ CacheBuilder.newBuilder()
+
.expireAfterWrite(COMPLETED_CHECKPOINTS_CACHE_DURATION_SECONDS,
TimeUnit.SECONDS)
+ .build();
+
+ /**
+ * Registers an ongoing checkpoint with the cache.
+ */
+ void registerOngoingCheckpoint(
+ final SavepointKey savepointTriggerId,
+ final CompletableFuture<CompletedCheckpoint>
checkpointFuture) {
+ registeredSavepointTriggers.add(savepointTriggerId);
+ checkpointFuture.whenComplete((completedCheckpoint,
error) -> {
+ if (error == null) {
+
completedCheckpoints.put(savepointTriggerId, Either.Right(completedCheckpoint));
+ } else {
+
completedCheckpoints.put(savepointTriggerId, Either.Left(error));
+ }
+
registeredSavepointTriggers.remove(savepointTriggerId);
+ });
+ }
+
+ /**
+ * Returns the CompletedCheckpoint or a Throwable if the
CompletableFuture finished,
+ * otherwise {@code null}.
+ *
+ * @throws UnknownSavepointTriggerId If the savepoint is not
found, and there is no ongoing
+ * checkpoint under the
provided key.
+ */
+ @Nullable
+ Either<Throwable, CompletedCheckpoint> get(
+ final SavepointKey savepointTriggerId) throws
UnknownSavepointTriggerId {
+ Either<Throwable, CompletedCheckpoint>
completedCheckpointOrError = null;
+ if
(!registeredSavepointTriggers.contains(savepointTriggerId)
+ && (completedCheckpointOrError =
completedCheckpoints.getIfPresent(savepointTriggerId)) == null) {
+ throw new UnknownSavepointTriggerId();
+ }
+ return completedCheckpointOrError;
+ }
+ }
+
+ /**
+ * A pair of {@link JobID} and {@link SavepointTriggerId} used as a key
to a hash based
+ * collection.
+ *
+ * @see CompletedCheckpointCache
+ */
+ @Immutable
+ static class SavepointKey {
+
+ private final SavepointTriggerId savepointTriggerId;
+
+ private final JobID jobId;
+
+ private SavepointKey(final SavepointTriggerId
savepointTriggerId, final JobID jobId) {
+ this.savepointTriggerId =
requireNonNull(savepointTriggerId);
+ this.jobId = requireNonNull(jobId);
+ }
+
+ private static SavepointKey of(final SavepointTriggerId
savepointTriggerId, final JobID jobId) {
+ return new SavepointKey(savepointTriggerId, jobId);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final SavepointKey that = (SavepointKey) o;
+
+ if
(!savepointTriggerId.equals(that.savepointTriggerId)) {
+ return false;
+ }
+ return jobId.equals(that.jobId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = savepointTriggerId.hashCode();
+ result = 31 * result + jobId.hashCode();
+ return result;
+ }
+ }
+
+ /**
+ * Exception that indicates that there is no ongoing or completed
checkpoint for a given
+ * {@link JobID} and {@link SavepointTriggerId} pair.
+ */
+ static class UnknownSavepointTriggerId extends Exception {
--- End diff --
easier to see that this exception originates from within Flink and not an
internal component.
---