[FLINK-8940] [flip6] Add support for dispose savepoint

Adds an AsynchronousOperationHandler for disposing savepoints. The handler is 
registered
under '/savepoint-disposal' and requires a SavepointDisposalRequest JSON object 
containing
the path to the savepoint to be disposed. The RestClusterClient polls the 
status registered
under '/savepoint-disposal/:triggerId' until the operation has been completed.

This closes #5764.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd715c66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd715c66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd715c66

Branch: refs/heads/master
Commit: bd715c663c226b6f0042a20d768e57433aa97623
Parents: 19fa504
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Mar 25 19:12:51 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Mar 27 08:34:42 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |   3 +-
 .../flink/client/program/ClusterClient.java     |   4 +-
 .../flink/client/program/MiniClusterClient.java |   4 +-
 .../client/program/rest/RestClusterClient.java  |  38 +++++++
 .../client/cli/CliFrontendSavepointTest.java    |  17 ++-
 .../flink/client/program/ClusterClientTest.java |   4 +-
 .../program/rest/RestClusterClientTest.java     | 107 ++++++++++++++++++
 .../flink/runtime/checkpoint/Checkpoints.java   |  11 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  21 ++++
 .../flink/runtime/minicluster/MiniCluster.java  |  12 ++
 .../rest/handler/async/OperationKey.java        |   4 +-
 .../savepoints/SavepointDisposalHandlers.java   | 112 +++++++++++++++++++
 .../savepoints/SavepointDisposalRequest.java    |  49 ++++++++
 .../SavepointDisposalStatusHeaders.java         |  75 +++++++++++++
 ...avepointDisposalStatusMessageParameters.java |  46 ++++++++
 .../SavepointDisposalTriggerHeaders.java        |  67 +++++++++++
 .../runtime/webmonitor/RestfulGateway.java      |  13 +++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  19 ++++
 .../runtime/dispatcher/DispatcherTest.java      |  63 ++++++++++-
 .../SavepointDisposalRequestTest.java           |  47 ++++++++
 20 files changed, 691 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index d636ef7..a874891 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -47,7 +47,6 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -706,7 +705,7 @@ public class CliFrontend {
 
                logAndSysout("Disposing savepoint '" + savepointPath + "'.");
 
-               final CompletableFuture<Acknowledge> disposeFuture = 
clusterClient.disposeSavepoint(savepointPath, 
FutureUtils.toTime(clientTimeout));
+               final CompletableFuture<Acknowledge> disposeFuture = 
clusterClient.disposeSavepoint(savepointPath);
 
                logAndSysout("Waiting for response...");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index f50206d..fbaa515 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -726,14 +726,14 @@ public abstract class ClusterClient<T> {
                });
        }
 
-       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath, Time timeout) throws FlinkException {
+       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath) throws FlinkException {
                final ActorGateway jobManager = getJobManagerGateway();
 
                Object msg = new 
JobManagerMessages.DisposeSavepoint(savepointPath);
                CompletableFuture<Object> responseFuture = 
FutureUtils.<Object>toJava(
                        jobManager.ask(
                                msg,
-                               FutureUtils.toFiniteDuration(timeout)));
+                               timeout));
 
                return responseFuture.thenApply(
                        (Object response) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 44f6ef6..802622e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -123,8 +123,8 @@ public class MiniClusterClient extends 
ClusterClient<MiniClusterClient.MiniClust
        }
 
        @Override
-       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath, Time timeout) throws FlinkException {
-               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath) throws FlinkException {
+               return guardWithSingleRetry(() -> 
miniCluster.disposeSavepoint(savepointPath), scheduledExecutor);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 2e1ffb0..9129714 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -72,6 +72,10 @@ import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
@@ -530,6 +534,40 @@ public class RestClusterClient<T> extends ClusterClient<T> 
{
                        });
        }
 
+       @Override
+       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath, Time timeout) {
+               final SavepointDisposalRequest savepointDisposalRequest = new 
SavepointDisposalRequest(savepointPath);
+
+               final CompletableFuture<TriggerResponse> 
savepointDisposalTriggerFuture = sendRequest(
+                       SavepointDisposalTriggerHeaders.getInstance(),
+                       EmptyMessageParameters.getInstance(),
+                       savepointDisposalRequest);
+
+               final CompletableFuture<AsynchronousOperationInfo> 
savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose(
+                       (TriggerResponse triggerResponse) -> {
+                               final TriggerId triggerId = 
triggerResponse.getTriggerId();
+                               final SavepointDisposalStatusHeaders 
savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance();
+                               final SavepointDisposalStatusMessageParameters 
savepointDisposalStatusMessageParameters = 
savepointDisposalStatusHeaders.getUnresolvedMessageParameters();
+                               
savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve(triggerId);
+
+                               return pollResourceAsync(
+                                       () -> sendRetryableRequest(
+                                               savepointDisposalStatusHeaders,
+                                               
savepointDisposalStatusMessageParameters,
+                                               EmptyRequestBody.getInstance(),
+                                               
isConnectionProblemException()));
+                       });
+
+               return savepointDisposalFuture.thenApply(
+                       (AsynchronousOperationInfo asynchronousOperationInfo) 
-> {
+                               if (asynchronousOperationInfo.getFailureCause() 
== null) {
+                                       return Acknowledge.get();
+                               } else {
+                                       throw new 
CompletionException(asynchronousOperationInfo.getFailureCause());
+                               }
+                       });
+       }
+
        /**
         * Creates a {@code CompletableFuture} that polls a {@code 
AsynchronouslyCreatedResource} until
         * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} 
becomes

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
index 3195a6b..75f3b3d 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.cli.util.MockedCliFrontend;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
@@ -41,7 +40,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.zip.ZipOutputStream;
 
 import static org.junit.Assert.assertEquals;
@@ -196,7 +195,7 @@ public class CliFrontendSavepointTest extends 
CliFrontendTestBase {
                String savepointPath = "expectedSavepointPath";
 
                ClusterClient clusterClient = new DisposeSavepointClusterClient(
-                       (String path, Time timeout) -> 
CompletableFuture.completedFuture(Acknowledge.get()), getConfiguration());
+                       (String path) -> 
CompletableFuture.completedFuture(Acknowledge.get()), getConfiguration());
 
                try {
 
@@ -225,7 +224,7 @@ public class CliFrontendSavepointTest extends 
CliFrontendTestBase {
                final CompletableFuture<String> disposeSavepointFuture = new 
CompletableFuture<>();
 
                final DisposeSavepointClusterClient clusterClient = new 
DisposeSavepointClusterClient(
-                       (String savepointPath, Time timeout) -> {
+                       (String savepointPath) -> {
                                disposeSavepointFuture.complete(savepointPath);
                                return 
CompletableFuture.completedFuture(Acknowledge.get());
                        }, getConfiguration());
@@ -260,7 +259,7 @@ public class CliFrontendSavepointTest extends 
CliFrontendTestBase {
 
                Exception testException = new 
Exception("expectedTestException");
 
-               DisposeSavepointClusterClient clusterClient = new 
DisposeSavepointClusterClient((String path, Time timeout) -> 
FutureUtils.completedExceptionally(testException), getConfiguration());
+               DisposeSavepointClusterClient clusterClient = new 
DisposeSavepointClusterClient((String path) -> 
FutureUtils.completedExceptionally(testException), getConfiguration());
 
                try {
                        CliFrontend frontend = new 
MockedCliFrontend(clusterClient);
@@ -285,17 +284,17 @@ public class CliFrontendSavepointTest extends 
CliFrontendTestBase {
 
        private static final class DisposeSavepointClusterClient extends 
StandaloneClusterClient {
 
-               private final BiFunction<String, Time, 
CompletableFuture<Acknowledge>> disposeSavepointFunction;
+               private final Function<String, CompletableFuture<Acknowledge>> 
disposeSavepointFunction;
 
-               DisposeSavepointClusterClient(BiFunction<String, Time, 
CompletableFuture<Acknowledge>> disposeSavepointFunction, Configuration 
configuration) {
+               DisposeSavepointClusterClient(Function<String, 
CompletableFuture<Acknowledge>> disposeSavepointFunction, Configuration 
configuration) {
                        super(configuration, new 
TestingHighAvailabilityServices(), false);
 
                        this.disposeSavepointFunction = 
Preconditions.checkNotNull(disposeSavepointFunction);
                }
 
                @Override
-               public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath, Time timeout) {
-                       return disposeSavepointFunction.apply(savepointPath, 
timeout);
+               public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath) {
+                       return disposeSavepointFunction.apply(savepointPath);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index f30fd19..07b3821 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -181,7 +181,7 @@ public class ClusterClientTest extends TestLogger {
 
                final TestClusterClient clusterClient = new 
TestClusterClient(configuration, jobManagerGateway);
 
-               CompletableFuture<Acknowledge> acknowledgeCompletableFuture = 
clusterClient.disposeSavepoint(savepointPath, timeout);
+               CompletableFuture<Acknowledge> acknowledgeCompletableFuture = 
clusterClient.disposeSavepoint(savepointPath);
 
                try {
                        acknowledgeCompletableFuture.get();
@@ -203,7 +203,7 @@ public class ClusterClientTest extends TestLogger {
 
                final TestClusterClient clusterClient = new 
TestClusterClient(configuration, jobManagerGateway);
 
-               CompletableFuture<Acknowledge> acknowledgeCompletableFuture = 
clusterClient.disposeSavepoint(savepointPath, timeout);
+               CompletableFuture<Acknowledge> acknowledgeCompletableFuture = 
clusterClient.disposeSavepoint(savepointPath);
 
                try {
                        acknowledgeCompletableFuture.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 77a4113..926da92 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestClient;
@@ -45,6 +46,7 @@ import 
org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import 
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
@@ -72,6 +74,10 @@ import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
@@ -84,7 +90,9 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -103,6 +111,7 @@ import org.mockito.MockitoAnnotations;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -112,6 +121,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -537,6 +547,103 @@ public class RestClusterClientTest extends TestLogger {
        }
 
        @Test
+       public void testDisposeSavepoint() throws Exception {
+               final String savepointPath = "foobar";
+               final String exceptionMessage = "Test exception.";
+               final FlinkException testException = new 
FlinkException(exceptionMessage);
+
+               final TestSavepointDisposalHandlers 
testSavepointDisposalHandlers = new 
TestSavepointDisposalHandlers(savepointPath);
+               final 
TestSavepointDisposalHandlers.TestSavepointDisposalTriggerHandler 
testSavepointDisposalTriggerHandler = testSavepointDisposalHandlers.new 
TestSavepointDisposalTriggerHandler();
+               final 
TestSavepointDisposalHandlers.TestSavepointDisposalStatusHandler 
testSavepointDisposalStatusHandler = testSavepointDisposalHandlers.new 
TestSavepointDisposalStatusHandler(
+                       
OptionalFailure.of(AsynchronousOperationInfo.complete()),
+                       
OptionalFailure.of(AsynchronousOperationInfo.completeExceptional(new 
SerializedThrowable(testException))),
+                       OptionalFailure.ofFailure(testException));
+
+               try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+                       testSavepointDisposalStatusHandler,
+                       testSavepointDisposalTriggerHandler)) {
+                       {
+                               final CompletableFuture<Acknowledge> 
disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+                               assertThat(disposeSavepointFuture.get(), 
is(Acknowledge.get()));
+                       }
+
+                       {
+                               final CompletableFuture<Acknowledge> 
disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath);
+
+                               try {
+                                       disposeSavepointFuture.get();
+                                       fail("Expected an exception");
+                               } catch (ExecutionException ee) {
+                                       
assertThat(ExceptionUtils.findThrowableWithMessage(ee, 
exceptionMessage).isPresent(), is(true));
+                               }
+                       }
+
+                       {
+                               try {
+                                       
restClusterClient.disposeSavepoint(savepointPath).get();
+                                       fail("Expected an exception.");
+                               } catch (ExecutionException ee) {
+                                       
assertThat(ExceptionUtils.findThrowable(ee, 
RestClientException.class).isPresent(), is(true));
+                               }
+                       }
+               }
+       }
+
+       private class TestSavepointDisposalHandlers {
+
+               private final TriggerId triggerId = new TriggerId();
+
+               private final String savepointPath;
+
+               private TestSavepointDisposalHandlers(String savepointPath) {
+                       this.savepointPath = 
Preconditions.checkNotNull(savepointPath);
+               }
+
+               private class TestSavepointDisposalTriggerHandler extends 
TestHandler<SavepointDisposalRequest, TriggerResponse, EmptyMessageParameters> {
+                       private TestSavepointDisposalTriggerHandler() {
+                               
super(SavepointDisposalTriggerHeaders.getInstance());
+                       }
+
+                       @Override
+                       protected CompletableFuture<TriggerResponse> 
handleRequest(@Nonnull HandlerRequest<SavepointDisposalRequest, 
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) {
+                               
assertThat(request.getRequestBody().getSavepointPath(), is(savepointPath));
+                               return CompletableFuture.completedFuture(new 
TriggerResponse(triggerId));
+                       }
+               }
+
+               private class TestSavepointDisposalStatusHandler extends 
TestHandler<EmptyRequestBody, 
AsynchronousOperationResult<AsynchronousOperationInfo>, 
SavepointDisposalStatusMessageParameters> {
+
+                       private final 
Queue<OptionalFailure<AsynchronousOperationInfo>> responses;
+
+                       private 
TestSavepointDisposalStatusHandler(OptionalFailure<AsynchronousOperationInfo>...
 responses) {
+                               
super(SavepointDisposalStatusHeaders.getInstance());
+                               this.responses = new 
ArrayDeque<>(Arrays.asList(responses));
+                       }
+
+                       @Override
+                       protected 
CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, 
SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway 
gateway) throws RestHandlerException {
+                               final TriggerId actualTriggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
+
+                               if (actualTriggerId.equals(triggerId)) {
+                                       final 
OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll();
+
+                                       if (nextResponse != null) {
+                                               if (nextResponse.isFailure()) {
+                                                       throw new 
RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, 
nextResponse.getFailureCause());
+                                               } else {
+                                                       return 
CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked()));
+                                               }
+                                       } else {
+                                               throw new AssertionError();
+                                       }
+                               } else {
+                                       throw new AssertionError();
+                               }
+                       }
+               }
+       }
+
+       @Test
        public void testListJobs() throws Exception {
                try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(new TestListJobsHandler())) {
                        {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 72b7c53..60fdc17 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.DataInputStream;
@@ -292,6 +293,13 @@ public class Checkpoints {
                checkNotNull(configuration, "configuration");
                checkNotNull(classLoader, "classLoader");
 
+               StateBackend backend = loadStateBackend(configuration, 
classLoader, logger);
+
+               disposeSavepoint(pointer, backend, classLoader);
+       }
+
+       @Nonnull
+       public static StateBackend loadStateBackend(Configuration 
configuration, ClassLoader classLoader, @Nullable Logger logger) {
                if (logger != null) {
                        logger.info("Attempting to load configured state 
backend for savepoint disposal");
                }
@@ -318,8 +326,7 @@ public class Checkpoints {
                        // FileSystem-based for metadata
                        backend = new MemoryStateBackend();
                }
-
-               disposeSavepoint(pointer, backend, classLoader);
+               return backend;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 68b4046..008d4dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -75,6 +76,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.stream.Collectors;
 
 /**
@@ -320,6 +322,25 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
+       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath, Time timeout) {
+               final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               log.info("Disposing savepoint {}.", 
savepointPath);
+
+                               try {
+                                       
Checkpoints.disposeSavepoint(savepointPath, configuration, classLoader, log);
+                               } catch (IOException | FlinkException e) {
+                                       throw new CompletionException(new 
FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), 
e));
+                               }
+
+                               return Acknowledge.get();
+                       },
+                       jobManagerSharedServices.getScheduledExecutorService());
+       }
+
+       @Override
        public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time 
timeout) {
                JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 0da6f33..66770c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -534,6 +534,18 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
+       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath) {
+               try {
+                       return 
getDispatcherGateway().disposeSavepoint(savepointPath, rpcTimeout);
+               } catch (LeaderRetrievalException | InterruptedException e) {
+                       ExceptionUtils.checkInterrupted(e);
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException(
+                                       String.format("Could not dispose 
savepoint %s.", savepointPath),
+                                       e));
+               }
+       }
+
        public CompletableFuture<? extends AccessExecutionGraph> 
getExecutionGraph(JobID jobId) {
                try {
                        return getDispatcherGateway().requestJob(jobId, 
rpcTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java
index a601e56..2f6e4bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java
@@ -27,11 +27,11 @@ import java.util.Objects;
  * Any operation key for the {@link AbstractAsynchronousOperationHandlers} 
must extend this class.
  * It is used to store the trigger id.
  */
-public abstract class OperationKey {
+public class OperationKey {
 
        private final TriggerId triggerId;
 
-       protected OperationKey(TriggerId triggerId) {
+       public OperationKey(TriggerId triggerId) {
                this.triggerId = Preconditions.checkNotNull(triggerId);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java
new file mode 100644
index 0000000..3cf5f59
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
+import org.apache.flink.runtime.rest.handler.async.OperationKey;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedThrowable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handlers to trigger the disposal of a savepoint.
+ */
+public class SavepointDisposalHandlers extends 
AbstractAsynchronousOperationHandlers<OperationKey, Acknowledge> {
+
+       /**
+        * {@link TriggerHandler} implementation for the savepoint disposal 
operation.
+        */
+       public class SavepointDisposalTriggerHandler extends 
TriggerHandler<RestfulGateway, SavepointDisposalRequest, 
EmptyMessageParameters> {
+
+               public SavepointDisposalTriggerHandler(
+                               CompletableFuture<String> localRestAddress,
+                               GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                               Time timeout,
+                               Map<String, String> responseHeaders) {
+                       super(
+                               localRestAddress,
+                               leaderRetriever,
+                               timeout,
+                               responseHeaders,
+                               SavepointDisposalTriggerHeaders.getInstance());
+               }
+
+               @Override
+               protected CompletableFuture<Acknowledge> 
triggerOperation(HandlerRequest<SavepointDisposalRequest, 
EmptyMessageParameters> request, RestfulGateway gateway) {
+                       final String savepointPath = 
request.getRequestBody().getSavepointPath();
+                       return gateway.disposeSavepoint(savepointPath, 
RpcUtils.INF_TIMEOUT);
+               }
+
+               @Override
+               protected OperationKey 
createOperationKey(HandlerRequest<SavepointDisposalRequest, 
EmptyMessageParameters> request) {
+                       return new OperationKey(new TriggerId());
+               }
+       }
+
+       /**
+        * {@link StatusHandler} implementation for the savepoint disposal 
operation.
+        */
+       public class SavepointDisposalStatusHandler extends 
StatusHandler<RestfulGateway, AsynchronousOperationInfo, 
SavepointDisposalStatusMessageParameters> {
+
+               public SavepointDisposalStatusHandler(
+                               CompletableFuture<String> localRestAddress,
+                               GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                               Time timeout,
+                               Map<String, String> responseHeaders) {
+                       super(
+                               localRestAddress,
+                               leaderRetriever,
+                               timeout,
+                               responseHeaders,
+                               SavepointDisposalStatusHeaders.getInstance());
+               }
+
+               @Override
+               protected OperationKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
SavepointDisposalStatusMessageParameters> request) {
+                       final TriggerId triggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
+                       return new OperationKey(triggerId);
+               }
+
+               @Override
+               protected AsynchronousOperationInfo 
exceptionalOperationResultResponse(Throwable throwable) {
+                       return 
AsynchronousOperationInfo.completeExceptional(new 
SerializedThrowable(throwable));
+               }
+
+               @Override
+               protected AsynchronousOperationInfo 
operationResultResponse(Acknowledge operationResult) {
+                       return AsynchronousOperationInfo.complete();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java
new file mode 100644
index 0000000..229ae91
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Request body for a savepoint disposal call.
+ */
+public class SavepointDisposalRequest implements RequestBody {
+
+       private static final String FIELD_NAME_SAVEPOINT_PATH = 
"savepoint-path";
+
+       @JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
+       private final String savepointPath;
+
+       @JsonCreator
+       public SavepointDisposalRequest(
+               @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) @Nonnull String 
savepointPath) {
+               this.savepointPath = savepointPath;
+       }
+
+       @JsonIgnore
+       public String getSavepointPath() {
+               return savepointPath;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java
new file mode 100644
index 0000000..74deffd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
+import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders;
+import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders;
+import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * {@link AsynchronousOperationTriggerMessageHeaders} implementation for the 
{@link SavepointDisposalHandlers.SavepointDisposalStatusHandler}.
+ */
+public class SavepointDisposalStatusHeaders extends 
AsynchronousOperationStatusMessageHeaders<AsynchronousOperationInfo, 
SavepointDisposalStatusMessageParameters> {
+
+       private static final SavepointDisposalStatusHeaders INSTANCE = new 
SavepointDisposalStatusHeaders();
+
+       private static final String URL = 
String.format("/savepoint-disposal/:%s", TriggerIdPathParameter.KEY);
+
+       private SavepointDisposalStatusHeaders() {}
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public SavepointDisposalStatusMessageParameters 
getUnresolvedMessageParameters() {
+               return new SavepointDisposalStatusMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static SavepointDisposalStatusHeaders getInstance() {
+               return INSTANCE;
+       }
+
+       @Override
+       protected Class<AsynchronousOperationInfo> getValueClass() {
+               return AsynchronousOperationInfo.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java
new file mode 100644
index 0000000..d8804c9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for the {@link 
SavepointDisposalHandlers.SavepointDisposalStatusHandler}.
+ */
+public class SavepointDisposalStatusMessageParameters extends 
MessageParameters {
+
+       public final TriggerIdPathParameter triggerIdPathParameter = new 
TriggerIdPathParameter();
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Collections.singleton(triggerIdPathParameter);
+       }
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return Collections.emptyList();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java
new file mode 100644
index 0000000..5786498
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders;
+import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * {@link AsynchronousOperationTriggerMessageHeaders} for the {@link 
SavepointDisposalHandlers.SavepointDisposalTriggerHandler}.
+ */
+public class SavepointDisposalTriggerHeaders extends 
AsynchronousOperationTriggerMessageHeaders<SavepointDisposalRequest, 
EmptyMessageParameters> {
+
+       private static final SavepointDisposalTriggerHeaders INSTANCE = new 
SavepointDisposalTriggerHeaders();
+
+       private static final String URL = "/savepoint-disposal";
+
+       private SavepointDisposalTriggerHeaders() {}
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public Class<SavepointDisposalRequest> getRequestClass() {
+               return SavepointDisposalRequest.class;
+       }
+
+       @Override
+       public EmptyMessageParameters getUnresolvedMessageParameters() {
+               return EmptyMessageParameters.getInstance();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.POST;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static SavepointDisposalTriggerHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 4714206..6bb088c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -145,6 +145,19 @@ public interface RestfulGateway extends RpcGateway {
        }
 
        /**
+        * Dispose the given savepoint.
+        *
+        * @param savepointPath identifying the savepoint to dispose
+        * @param timeout RPC timeout
+        * @return A future acknowledge if the disposal succeeded
+        */
+       default CompletableFuture<Acknowledge> disposeSavepoint(
+                       final String savepointPath,
+                       @RpcTimeout final Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       /**
         * Request the {@link JobStatus} of the given job.
         *
         * @param jobId identifying the job for which to retrieve the JobStatus

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index d4aa94e..af346a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -64,6 +64,7 @@ import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandl
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
 import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders;
 import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
+import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
 import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
 import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -108,6 +109,8 @@ import 
org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
@@ -499,6 +502,20 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        executor,
                        metricFetcher);
 
+               final SavepointDisposalHandlers savepointDisposalHandlers = new 
SavepointDisposalHandlers();
+
+               final SavepointDisposalHandlers.SavepointDisposalTriggerHandler 
savepointDisposalTriggerHandler = savepointDisposalHandlers.new 
SavepointDisposalTriggerHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders);
+
+               final SavepointDisposalHandlers.SavepointDisposalStatusHandler 
savepointDisposalStatusHandler = savepointDisposalHandlers.new 
SavepointDisposalStatusHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders);
+
                final Path webUiDir = restConfiguration.getWebUiDir();
 
                Optional<StaticFileServerHandler<T>> optWebContent;
@@ -549,6 +566,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                handlers.add(Tuple2.of(JobVertexDetailsHeaders.getInstance(), 
jobVertexDetailsHandler));
                handlers.add(Tuple2.of(RescalingTriggerHeaders.getInstance(), 
rescalingTriggerHandler));
                handlers.add(Tuple2.of(RescalingStatusHeaders.getInstance(), 
rescalingStatusHandler));
+               
handlers.add(Tuple2.of(SavepointDisposalTriggerHeaders.getInstance(), 
savepointDisposalTriggerHandler));
+               
handlers.add(Tuple2.of(SavepointDisposalStatusHeaders.getInstance(), 
savepointDisposalStatusHandler));
 
                // TODO: Remove once the Yarn proxy can forward all REST verbs
                
handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), 
jobCancelTerminationHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 71c391f..8ea686b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -25,7 +25,9 @@ import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
@@ -54,6 +56,11 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -72,9 +79,18 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -126,6 +142,8 @@ public class DispatcherTest extends TestLogger {
 
        private RunningJobsRegistry runningJobsRegistry;
 
+       private Configuration configuration;
+
        /** Instance under test. */
        private TestingDispatcher dispatcher;
 
@@ -165,18 +183,19 @@ public class DispatcherTest extends TestLogger {
                haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
                runningJobsRegistry = haServices.getRunningJobsRegistry();
 
-               final Configuration blobServerConfig = new Configuration();
-               blobServerConfig.setString(
+               configuration = new Configuration();
+
+               configuration.setString(
                        BlobServerOptions.STORAGE_DIRECTORY,
                        temporaryFolder.newFolder().getAbsolutePath());
 
                dispatcher = new TestingDispatcher(
                        rpcService,
                        Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
-                       new Configuration(),
+                       configuration,
                        haServices,
                        mock(ResourceManagerGateway.class),
-                       new BlobServer(blobServerConfig, new VoidBlobStore()),
+                       new BlobServer(configuration, new VoidBlobStore()),
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                        null,
@@ -344,6 +363,42 @@ public class DispatcherTest extends TestLogger {
                assertThat(jobIds, contains(jobGraph.getJobID()));
        }
 
+       /**
+        * Tests that we can dispose a savepoint.
+        */
+       @Test
+       public void testSavepointDisposal() throws Exception {
+               final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+               final URI externalPointer = createTestingSavepoint();
+               final Path savepointPath = Paths.get(externalPointer);
+
+               assertThat(Files.exists(savepointPath), is(true));
+
+               dispatcherGateway.disposeSavepoint(externalPointer.toString(), 
TIMEOUT).get();
+
+               assertThat(Files.exists(savepointPath), is(false));
+       }
+
+       @Nonnull
+       private URI createTestingSavepoint() throws IOException, 
URISyntaxException {
+               final StateBackend stateBackend = 
Checkpoints.loadStateBackend(configuration, 
Thread.currentThread().getContextClassLoader(), log);
+               final CheckpointStorage checkpointStorage = 
stateBackend.createCheckpointStorage(jobGraph.getJobID());
+               final File savepointFile = temporaryFolder.newFolder();
+               final long checkpointId = 1L;
+
+               final CheckpointStorageLocation checkpointStorageLocation = 
checkpointStorage.initializeLocationForSavepoint(checkpointId, 
savepointFile.getAbsolutePath());
+
+               final CheckpointMetadataOutputStream metadataOutputStream = 
checkpointStorageLocation.createMetadataOutputStream();
+               Checkpoints.storeCheckpointMetadata(new 
SavepointV2(checkpointId, Collections.emptyList(), Collections.emptyList()), 
metadataOutputStream);
+
+               final CompletedCheckpointStorageLocation 
completedCheckpointStorageLocation = 
metadataOutputStream.closeAndFinalizeCheckpoint();
+
+               return new 
URI(completedCheckpointStorageLocation.getExternalPointer());
+       }
+
        private static class TestingDispatcher extends Dispatcher {
 
                private final CountDownLatch submitJobLatch = new 
CountDownLatch(2);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java
new file mode 100644
index 0000000..3d5a90a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+
+import java.util.UUID;
+
+/**
+ * Tests the un/marshalling of the {@link SavepointDisposalRequest}.
+ */
+public class SavepointDisposalRequestTest extends 
RestRequestMarshallingTestBase<SavepointDisposalRequest> {
+
+       @Override
+       protected Class<SavepointDisposalRequest> getTestRequestClass() {
+               return SavepointDisposalRequest.class;
+       }
+
+       @Override
+       protected SavepointDisposalRequest getTestRequestInstance() {
+               return new 
SavepointDisposalRequest(UUID.randomUUID().toString());
+       }
+
+       @Override
+       protected void 
assertOriginalEqualsToUnmarshalled(SavepointDisposalRequest expected, 
SavepointDisposalRequest actual) {
+               Assert.assertThat(actual.getSavepointPath(), 
Matchers.is(Matchers.equalTo(expected.getSavepointPath())));
+       }
+}

Reply via email to