[FLINK-8940] [flip6] Add support for dispose savepoint Adds an AsynchronousOperationHandler for disposing savepoints. The handler is registered under '/savepoint-disposal' and requires a SavepointDisposalRequest JSON object containing the path to the savepoint to be disposed. The RestClusterClient polls the status registered under '/savepoint-disposal/:triggerId' until the operation has been completed.
This closes #5764. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd715c66 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd715c66 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd715c66 Branch: refs/heads/master Commit: bd715c663c226b6f0042a20d768e57433aa97623 Parents: 19fa504 Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Mar 25 19:12:51 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Mar 27 08:34:42 2018 +0200 ---------------------------------------------------------------------- .../apache/flink/client/cli/CliFrontend.java | 3 +- .../flink/client/program/ClusterClient.java | 4 +- .../flink/client/program/MiniClusterClient.java | 4 +- .../client/program/rest/RestClusterClient.java | 38 +++++++ .../client/cli/CliFrontendSavepointTest.java | 17 ++- .../flink/client/program/ClusterClientTest.java | 4 +- .../program/rest/RestClusterClientTest.java | 107 ++++++++++++++++++ .../flink/runtime/checkpoint/Checkpoints.java | 11 +- .../flink/runtime/dispatcher/Dispatcher.java | 21 ++++ .../flink/runtime/minicluster/MiniCluster.java | 12 ++ .../rest/handler/async/OperationKey.java | 4 +- .../savepoints/SavepointDisposalHandlers.java | 112 +++++++++++++++++++ .../savepoints/SavepointDisposalRequest.java | 49 ++++++++ .../SavepointDisposalStatusHeaders.java | 75 +++++++++++++ ...avepointDisposalStatusMessageParameters.java | 46 ++++++++ .../SavepointDisposalTriggerHeaders.java | 67 +++++++++++ .../runtime/webmonitor/RestfulGateway.java | 13 +++ .../runtime/webmonitor/WebMonitorEndpoint.java | 19 ++++ .../runtime/dispatcher/DispatcherTest.java | 63 ++++++++++- .../SavepointDisposalRequestTest.java | 47 ++++++++ 20 files changed, 691 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index d636ef7..a874891 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -47,7 +47,6 @@ import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.Acknowledge; @@ -706,7 +705,7 @@ public class CliFrontend { logAndSysout("Disposing savepoint '" + savepointPath + "'."); - final CompletableFuture<Acknowledge> disposeFuture = clusterClient.disposeSavepoint(savepointPath, FutureUtils.toTime(clientTimeout)); + final CompletableFuture<Acknowledge> disposeFuture = clusterClient.disposeSavepoint(savepointPath); logAndSysout("Waiting for response..."); http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index f50206d..fbaa515 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -726,14 +726,14 @@ public abstract class ClusterClient<T> { }); } - public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) throws FlinkException { + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException { final ActorGateway jobManager = getJobManagerGateway(); Object msg = new JobManagerMessages.DisposeSavepoint(savepointPath); CompletableFuture<Object> responseFuture = FutureUtils.<Object>toJava( jobManager.ask( msg, - FutureUtils.toFiniteDuration(timeout))); + timeout)); return responseFuture.thenApply( (Object response) -> { http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 44f6ef6..802622e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -123,8 +123,8 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust } @Override - public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) throws FlinkException { - throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation."); + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException { + return guardWithSingleRetry(() -> miniCluster.disposeSavepoint(savepointPath), scheduledExecutor); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 2e1ffb0..9129714 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -72,6 +72,10 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; @@ -530,6 +534,40 @@ public class RestClusterClient<T> extends ClusterClient<T> { }); } + @Override + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) { + final SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath); + + final CompletableFuture<TriggerResponse> savepointDisposalTriggerFuture = sendRequest( + SavepointDisposalTriggerHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + savepointDisposalRequest); + + final CompletableFuture<AsynchronousOperationInfo> savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose( + (TriggerResponse triggerResponse) -> { + final TriggerId triggerId = triggerResponse.getTriggerId(); + final SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance(); + final SavepointDisposalStatusMessageParameters savepointDisposalStatusMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters(); + savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve(triggerId); + + return pollResourceAsync( + () -> sendRetryableRequest( + savepointDisposalStatusHeaders, + savepointDisposalStatusMessageParameters, + EmptyRequestBody.getInstance(), + isConnectionProblemException())); + }); + + return savepointDisposalFuture.thenApply( + (AsynchronousOperationInfo asynchronousOperationInfo) -> { + if (asynchronousOperationInfo.getFailureCause() == null) { + return Acknowledge.get(); + } else { + throw new CompletionException(asynchronousOperationInfo.getFailureCause()); + } + }); + } + /** * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java index 3195a6b..75f3b3d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java @@ -19,7 +19,6 @@ package org.apache.flink.client.cli; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.client.cli.util.MockedCliFrontend; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.StandaloneClusterClient; @@ -41,7 +40,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; +import java.util.function.Function; import java.util.zip.ZipOutputStream; import static org.junit.Assert.assertEquals; @@ -196,7 +195,7 @@ public class CliFrontendSavepointTest extends CliFrontendTestBase { String savepointPath = "expectedSavepointPath"; ClusterClient clusterClient = new DisposeSavepointClusterClient( - (String path, Time timeout) -> CompletableFuture.completedFuture(Acknowledge.get()), getConfiguration()); + (String path) -> CompletableFuture.completedFuture(Acknowledge.get()), getConfiguration()); try { @@ -225,7 +224,7 @@ public class CliFrontendSavepointTest extends CliFrontendTestBase { final CompletableFuture<String> disposeSavepointFuture = new CompletableFuture<>(); final DisposeSavepointClusterClient clusterClient = new DisposeSavepointClusterClient( - (String savepointPath, Time timeout) -> { + (String savepointPath) -> { disposeSavepointFuture.complete(savepointPath); return CompletableFuture.completedFuture(Acknowledge.get()); }, getConfiguration()); @@ -260,7 +259,7 @@ public class CliFrontendSavepointTest extends CliFrontendTestBase { Exception testException = new Exception("expectedTestException"); - DisposeSavepointClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> FutureUtils.completedExceptionally(testException), getConfiguration()); + DisposeSavepointClusterClient clusterClient = new DisposeSavepointClusterClient((String path) -> FutureUtils.completedExceptionally(testException), getConfiguration()); try { CliFrontend frontend = new MockedCliFrontend(clusterClient); @@ -285,17 +284,17 @@ public class CliFrontendSavepointTest extends CliFrontendTestBase { private static final class DisposeSavepointClusterClient extends StandaloneClusterClient { - private final BiFunction<String, Time, CompletableFuture<Acknowledge>> disposeSavepointFunction; + private final Function<String, CompletableFuture<Acknowledge>> disposeSavepointFunction; - DisposeSavepointClusterClient(BiFunction<String, Time, CompletableFuture<Acknowledge>> disposeSavepointFunction, Configuration configuration) { + DisposeSavepointClusterClient(Function<String, CompletableFuture<Acknowledge>> disposeSavepointFunction, Configuration configuration) { super(configuration, new TestingHighAvailabilityServices(), false); this.disposeSavepointFunction = Preconditions.checkNotNull(disposeSavepointFunction); } @Override - public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) { - return disposeSavepointFunction.apply(savepointPath, timeout); + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { + return disposeSavepointFunction.apply(savepointPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java index f30fd19..07b3821 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java @@ -181,7 +181,7 @@ public class ClusterClientTest extends TestLogger { final TestClusterClient clusterClient = new TestClusterClient(configuration, jobManagerGateway); - CompletableFuture<Acknowledge> acknowledgeCompletableFuture = clusterClient.disposeSavepoint(savepointPath, timeout); + CompletableFuture<Acknowledge> acknowledgeCompletableFuture = clusterClient.disposeSavepoint(savepointPath); try { acknowledgeCompletableFuture.get(); @@ -203,7 +203,7 @@ public class ClusterClientTest extends TestLogger { final TestClusterClient clusterClient = new TestClusterClient(configuration, jobManagerGateway); - CompletableFuture<Acknowledge> acknowledgeCompletableFuture = clusterClient.disposeSavepoint(savepointPath, timeout); + CompletableFuture<Acknowledge> acknowledgeCompletableFuture = clusterClient.disposeSavepoint(savepointPath); try { acknowledgeCompletableFuture.get(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 77a4113..926da92 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.RestClient; @@ -45,6 +46,7 @@ import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter; @@ -72,6 +74,10 @@ import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; @@ -84,7 +90,9 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.OptionalFailure; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -103,6 +111,7 @@ import org.mockito.MockitoAnnotations; import javax.annotation.Nonnull; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -112,6 +121,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -537,6 +547,103 @@ public class RestClusterClientTest extends TestLogger { } @Test + public void testDisposeSavepoint() throws Exception { + final String savepointPath = "foobar"; + final String exceptionMessage = "Test exception."; + final FlinkException testException = new FlinkException(exceptionMessage); + + final TestSavepointDisposalHandlers testSavepointDisposalHandlers = new TestSavepointDisposalHandlers(savepointPath); + final TestSavepointDisposalHandlers.TestSavepointDisposalTriggerHandler testSavepointDisposalTriggerHandler = testSavepointDisposalHandlers.new TestSavepointDisposalTriggerHandler(); + final TestSavepointDisposalHandlers.TestSavepointDisposalStatusHandler testSavepointDisposalStatusHandler = testSavepointDisposalHandlers.new TestSavepointDisposalStatusHandler( + OptionalFailure.of(AsynchronousOperationInfo.complete()), + OptionalFailure.of(AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(testException))), + OptionalFailure.ofFailure(testException)); + + try (TestRestServerEndpoint ignored = createRestServerEndpoint( + testSavepointDisposalStatusHandler, + testSavepointDisposalTriggerHandler)) { + { + final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath); + assertThat(disposeSavepointFuture.get(), is(Acknowledge.get())); + } + + { + final CompletableFuture<Acknowledge> disposeSavepointFuture = restClusterClient.disposeSavepoint(savepointPath); + + try { + disposeSavepointFuture.get(); + fail("Expected an exception"); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.findThrowableWithMessage(ee, exceptionMessage).isPresent(), is(true)); + } + } + + { + try { + restClusterClient.disposeSavepoint(savepointPath).get(); + fail("Expected an exception."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.findThrowable(ee, RestClientException.class).isPresent(), is(true)); + } + } + } + } + + private class TestSavepointDisposalHandlers { + + private final TriggerId triggerId = new TriggerId(); + + private final String savepointPath; + + private TestSavepointDisposalHandlers(String savepointPath) { + this.savepointPath = Preconditions.checkNotNull(savepointPath); + } + + private class TestSavepointDisposalTriggerHandler extends TestHandler<SavepointDisposalRequest, TriggerResponse, EmptyMessageParameters> { + private TestSavepointDisposalTriggerHandler() { + super(SavepointDisposalTriggerHeaders.getInstance()); + } + + @Override + protected CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<SavepointDisposalRequest, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) { + assertThat(request.getRequestBody().getSavepointPath(), is(savepointPath)); + return CompletableFuture.completedFuture(new TriggerResponse(triggerId)); + } + } + + private class TestSavepointDisposalStatusHandler extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<AsynchronousOperationInfo>, SavepointDisposalStatusMessageParameters> { + + private final Queue<OptionalFailure<AsynchronousOperationInfo>> responses; + + private TestSavepointDisposalStatusHandler(OptionalFailure<AsynchronousOperationInfo>... responses) { + super(SavepointDisposalStatusHeaders.getInstance()); + this.responses = new ArrayDeque<>(Arrays.asList(responses)); + } + + @Override + protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + final TriggerId actualTriggerId = request.getPathParameter(TriggerIdPathParameter.class); + + if (actualTriggerId.equals(triggerId)) { + final OptionalFailure<AsynchronousOperationInfo> nextResponse = responses.poll(); + + if (nextResponse != null) { + if (nextResponse.isFailure()) { + throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, nextResponse.getFailureCause()); + } else { + return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(nextResponse.getUnchecked())); + } + } else { + throw new AssertionError(); + } + } else { + throw new AssertionError(); + } + } + } + } + + @Test public void testListJobs() throws Exception { try (TestRestServerEndpoint ignored = createRestServerEndpoint(new TestListJobsHandler())) { { http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index 72b7c53..60fdc17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -38,6 +38,7 @@ import org.apache.flink.util.FlinkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.DataInputStream; @@ -292,6 +293,13 @@ public class Checkpoints { checkNotNull(configuration, "configuration"); checkNotNull(classLoader, "classLoader"); + StateBackend backend = loadStateBackend(configuration, classLoader, logger); + + disposeSavepoint(pointer, backend, classLoader); + } + + @Nonnull + public static StateBackend loadStateBackend(Configuration configuration, ClassLoader classLoader, @Nullable Logger logger) { if (logger != null) { logger.info("Attempting to load configured state backend for savepoint disposal"); } @@ -318,8 +326,7 @@ public class Checkpoints { // FileSystem-based for metadata backend = new MemoryStateBackend(); } - - disposeSavepoint(pointer, backend, classLoader); + return backend; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 68b4046..008d4dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -75,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; /** @@ -320,6 +322,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } @Override + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) { + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + return CompletableFuture.supplyAsync( + () -> { + log.info("Disposing savepoint {}.", savepointPath); + + try { + Checkpoints.disposeSavepoint(savepointPath, configuration, classLoader, log); + } catch (IOException | FlinkException e) { + throw new CompletionException(new FlinkException(String.format("Could not dispose savepoint %s.", savepointPath), e)); + } + + return Acknowledge.get(); + }, + jobManagerSharedServices.getScheduledExecutorService()); + } + + @Override public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) { JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 0da6f33..66770c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -534,6 +534,18 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { + try { + return getDispatcherGateway().disposeSavepoint(savepointPath, rpcTimeout); + } catch (LeaderRetrievalException | InterruptedException e) { + ExceptionUtils.checkInterrupted(e); + return FutureUtils.completedExceptionally( + new FlinkException( + String.format("Could not dispose savepoint %s.", savepointPath), + e)); + } + } + public CompletableFuture<? extends AccessExecutionGraph> getExecutionGraph(JobID jobId) { try { return getDispatcherGateway().requestJob(jobId, rpcTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java index a601e56..2f6e4bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/OperationKey.java @@ -27,11 +27,11 @@ import java.util.Objects; * Any operation key for the {@link AbstractAsynchronousOperationHandlers} must extend this class. * It is used to store the trigger id. */ -public abstract class OperationKey { +public class OperationKey { private final TriggerId triggerId; - protected OperationKey(TriggerId triggerId) { + public OperationKey(TriggerId triggerId) { this.triggerId = Preconditions.checkNotNull(triggerId); } http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java new file mode 100644 index 0000000..3cf5f59 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointDisposalHandlers.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.async.OperationKey; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.SerializedThrowable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Handlers to trigger the disposal of a savepoint. + */ +public class SavepointDisposalHandlers extends AbstractAsynchronousOperationHandlers<OperationKey, Acknowledge> { + + /** + * {@link TriggerHandler} implementation for the savepoint disposal operation. + */ + public class SavepointDisposalTriggerHandler extends TriggerHandler<RestfulGateway, SavepointDisposalRequest, EmptyMessageParameters> { + + public SavepointDisposalTriggerHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders) { + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + SavepointDisposalTriggerHeaders.getInstance()); + } + + @Override + protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<SavepointDisposalRequest, EmptyMessageParameters> request, RestfulGateway gateway) { + final String savepointPath = request.getRequestBody().getSavepointPath(); + return gateway.disposeSavepoint(savepointPath, RpcUtils.INF_TIMEOUT); + } + + @Override + protected OperationKey createOperationKey(HandlerRequest<SavepointDisposalRequest, EmptyMessageParameters> request) { + return new OperationKey(new TriggerId()); + } + } + + /** + * {@link StatusHandler} implementation for the savepoint disposal operation. + */ + public class SavepointDisposalStatusHandler extends StatusHandler<RestfulGateway, AsynchronousOperationInfo, SavepointDisposalStatusMessageParameters> { + + public SavepointDisposalStatusHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders) { + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + SavepointDisposalStatusHeaders.getInstance()); + } + + @Override + protected OperationKey getOperationKey(HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> request) { + final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); + return new OperationKey(triggerId); + } + + @Override + protected AsynchronousOperationInfo exceptionalOperationResultResponse(Throwable throwable) { + return AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(throwable)); + } + + @Override + protected AsynchronousOperationInfo operationResultResponse(Acknowledge operationResult) { + return AsynchronousOperationInfo.complete(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java new file mode 100644 index 0000000..229ae91 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.savepoints; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nonnull; + +/** + * Request body for a savepoint disposal call. + */ +public class SavepointDisposalRequest implements RequestBody { + + private static final String FIELD_NAME_SAVEPOINT_PATH = "savepoint-path"; + + @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) + private final String savepointPath; + + @JsonCreator + public SavepointDisposalRequest( + @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) @Nonnull String savepointPath) { + this.savepointPath = savepointPath; + } + + @JsonIgnore + public String getSavepointPath() { + return savepointPath; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java new file mode 100644 index 0000000..74deffd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusHeaders.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.savepoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders; +import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * {@link AsynchronousOperationTriggerMessageHeaders} implementation for the {@link SavepointDisposalHandlers.SavepointDisposalStatusHandler}. + */ +public class SavepointDisposalStatusHeaders extends AsynchronousOperationStatusMessageHeaders<AsynchronousOperationInfo, SavepointDisposalStatusMessageParameters> { + + private static final SavepointDisposalStatusHeaders INSTANCE = new SavepointDisposalStatusHeaders(); + + private static final String URL = String.format("/savepoint-disposal/:%s", TriggerIdPathParameter.KEY); + + private SavepointDisposalStatusHeaders() {} + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public SavepointDisposalStatusMessageParameters getUnresolvedMessageParameters() { + return new SavepointDisposalStatusMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static SavepointDisposalStatusHeaders getInstance() { + return INSTANCE; + } + + @Override + protected Class<AsynchronousOperationInfo> getValueClass() { + return AsynchronousOperationInfo.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java new file mode 100644 index 0000000..d8804c9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalStatusMessageParameters.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.savepoints; + +import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * {@link MessageParameters} for the {@link SavepointDisposalHandlers.SavepointDisposalStatusHandler}. + */ +public class SavepointDisposalStatusMessageParameters extends MessageParameters { + + public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.singleton(triggerIdPathParameter); + } + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java new file mode 100644 index 0000000..5786498 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalTriggerHeaders.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.savepoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders; +import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * {@link AsynchronousOperationTriggerMessageHeaders} for the {@link SavepointDisposalHandlers.SavepointDisposalTriggerHandler}. + */ +public class SavepointDisposalTriggerHeaders extends AsynchronousOperationTriggerMessageHeaders<SavepointDisposalRequest, EmptyMessageParameters> { + + private static final SavepointDisposalTriggerHeaders INSTANCE = new SavepointDisposalTriggerHeaders(); + + private static final String URL = "/savepoint-disposal"; + + private SavepointDisposalTriggerHeaders() {} + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public Class<SavepointDisposalRequest> getRequestClass() { + return SavepointDisposalRequest.class; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static SavepointDisposalTriggerHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index 4714206..6bb088c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -145,6 +145,19 @@ public interface RestfulGateway extends RpcGateway { } /** + * Dispose the given savepoint. + * + * @param savepointPath identifying the savepoint to dispose + * @param timeout RPC timeout + * @return A future acknowledge if the disposal succeeded + */ + default CompletableFuture<Acknowledge> disposeSavepoint( + final String savepointPath, + @RpcTimeout final Time timeout) { + throw new UnsupportedOperationException(); + } + + /** * Request the {@link JobStatus} of the given job. * * @param jobId identifying the job for which to retrieve the JobStatus http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index d4aa94e..af346a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandl import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; +import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers; import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; @@ -108,6 +109,8 @@ import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; @@ -499,6 +502,20 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp executor, metricFetcher); + final SavepointDisposalHandlers savepointDisposalHandlers = new SavepointDisposalHandlers(); + + final SavepointDisposalHandlers.SavepointDisposalTriggerHandler savepointDisposalTriggerHandler = savepointDisposalHandlers.new SavepointDisposalTriggerHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); + + final SavepointDisposalHandlers.SavepointDisposalStatusHandler savepointDisposalStatusHandler = savepointDisposalHandlers.new SavepointDisposalStatusHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); + final Path webUiDir = restConfiguration.getWebUiDir(); Optional<StaticFileServerHandler<T>> optWebContent; @@ -549,6 +566,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add(Tuple2.of(JobVertexDetailsHeaders.getInstance(), jobVertexDetailsHandler)); handlers.add(Tuple2.of(RescalingTriggerHeaders.getInstance(), rescalingTriggerHandler)); handlers.add(Tuple2.of(RescalingStatusHeaders.getInstance(), rescalingStatusHandler)); + handlers.add(Tuple2.of(SavepointDisposalTriggerHeaders.getInstance(), savepointDisposalTriggerHandler)); + handlers.add(Tuple2.of(SavepointDisposalStatusHeaders.getInstance(), savepointDisposalStatusHandler)); // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 71c391f..8ea686b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -25,7 +25,9 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; @@ -54,6 +56,11 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; @@ -72,9 +79,18 @@ import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.mockito.Mockito; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collection; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -126,6 +142,8 @@ public class DispatcherTest extends TestLogger { private RunningJobsRegistry runningJobsRegistry; + private Configuration configuration; + /** Instance under test. */ private TestingDispatcher dispatcher; @@ -165,18 +183,19 @@ public class DispatcherTest extends TestLogger { haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); runningJobsRegistry = haServices.getRunningJobsRegistry(); - final Configuration blobServerConfig = new Configuration(); - blobServerConfig.setString( + configuration = new Configuration(); + + configuration.setString( BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); dispatcher = new TestingDispatcher( rpcService, Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), + configuration, haServices, mock(ResourceManagerGateway.class), - new BlobServer(blobServerConfig, new VoidBlobStore()), + new BlobServer(configuration, new VoidBlobStore()), heartbeatServices, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, @@ -344,6 +363,42 @@ public class DispatcherTest extends TestLogger { assertThat(jobIds, contains(jobGraph.getJobID())); } + /** + * Tests that we can dispose a savepoint. + */ + @Test + public void testSavepointDisposal() throws Exception { + final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get(); + + final URI externalPointer = createTestingSavepoint(); + final Path savepointPath = Paths.get(externalPointer); + + assertThat(Files.exists(savepointPath), is(true)); + + dispatcherGateway.disposeSavepoint(externalPointer.toString(), TIMEOUT).get(); + + assertThat(Files.exists(savepointPath), is(false)); + } + + @Nonnull + private URI createTestingSavepoint() throws IOException, URISyntaxException { + final StateBackend stateBackend = Checkpoints.loadStateBackend(configuration, Thread.currentThread().getContextClassLoader(), log); + final CheckpointStorage checkpointStorage = stateBackend.createCheckpointStorage(jobGraph.getJobID()); + final File savepointFile = temporaryFolder.newFolder(); + final long checkpointId = 1L; + + final CheckpointStorageLocation checkpointStorageLocation = checkpointStorage.initializeLocationForSavepoint(checkpointId, savepointFile.getAbsolutePath()); + + final CheckpointMetadataOutputStream metadataOutputStream = checkpointStorageLocation.createMetadataOutputStream(); + Checkpoints.storeCheckpointMetadata(new SavepointV2(checkpointId, Collections.emptyList(), Collections.emptyList()), metadataOutputStream); + + final CompletedCheckpointStorageLocation completedCheckpointStorageLocation = metadataOutputStream.closeAndFinalizeCheckpoint(); + + return new URI(completedCheckpointStorageLocation.getExternalPointer()); + } + private static class TestingDispatcher extends Dispatcher { private final CountDownLatch submitJobLatch = new CountDownLatch(2); http://git-wip-us.apache.org/repos/asf/flink/blob/bd715c66/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java new file mode 100644 index 0000000..3d5a90a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointDisposalRequestTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.savepoints; + +import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase; + +import org.hamcrest.Matchers; +import org.junit.Assert; + +import java.util.UUID; + +/** + * Tests the un/marshalling of the {@link SavepointDisposalRequest}. + */ +public class SavepointDisposalRequestTest extends RestRequestMarshallingTestBase<SavepointDisposalRequest> { + + @Override + protected Class<SavepointDisposalRequest> getTestRequestClass() { + return SavepointDisposalRequest.class; + } + + @Override + protected SavepointDisposalRequest getTestRequestInstance() { + return new SavepointDisposalRequest(UUID.randomUUID().toString()); + } + + @Override + protected void assertOriginalEqualsToUnmarshalled(SavepointDisposalRequest expected, SavepointDisposalRequest actual) { + Assert.assertThat(actual.getSavepointPath(), Matchers.is(Matchers.equalTo(expected.getSavepointPath()))); + } +}