Repository: flink Updated Branches: refs/heads/master e9b20ec21 -> 3bc9cad04
[FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers - Add handlers for triggering and monitoring job cancellation with savepoints. This closes #2626. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bc9cad0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bc9cad0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bc9cad0 Branch: refs/heads/master Commit: 3bc9cad045b25d413f0b9f054fff12fac18a4f0e Parents: 2fb6009 Author: Ufuk Celebi <[email protected]> Authored: Tue Oct 11 10:09:20 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Fri Oct 28 11:04:12 2016 +0200 ---------------------------------------------------------------------- docs/monitoring/rest_api.md | 70 +++- .../runtime/webmonitor/WebRuntimeMonitor.java | 12 + .../JobCancellationWithSavepointHandlers.java | 420 +++++++++++++++++++ ...obCancellationWithSavepointHandlersTest.java | 314 ++++++++++++++ .../metrics/AbstractMetricsHandlerTest.java | 8 +- .../checkpoint/CheckpointCoordinator.java | 4 + .../executiongraph/AccessExecutionGraph.java | 9 + .../executiongraph/ArchivedExecutionGraph.java | 6 + .../runtime/executiongraph/ExecutionGraph.java | 1 + 9 files changed, 839 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/docs/monitoring/rest_api.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md index e7fccc5..e84e2cc 100644 --- a/docs/monitoring/rest_api.md +++ b/docs/monitoring/rest_api.md @@ -31,7 +31,6 @@ The monitoring API is a REST-ful API that accepts HTTP GET requests and responds {:toc} - ## Overview The monitoring API is backed by a web server that runs as part of the *JobManager*. By default, this server listens at post `8081`, which can be configured in `flink-conf.yaml` via `jobmanager.web.port`. Note that the monitoring API web server and the web dashboard web server are currently the same and thus run together at the same port. They respond to different HTTP URLs, though. @@ -584,3 +583,72 @@ Sample Result: } ] } ~~~ + +### Job Cancellation + +#### Cancel Job + +`DELETE` request to **`/jobs/:jobid/cancel`**. + +Triggers job cancellation, result on success is `{}`. + +#### Cancel Job with Savepoint + +Triggers a savepoint and cancels the job after the savepoint succeeds. + +`GET` request to **`/jobs/:jobid/cancel-with-savepoint/`** triggers a savepoint to the default savepoint directory and cancels the job. + +`GET` request to **`/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory`** triggers a savepoint to the given target directory and cancels the job. + +Since savepoints can take some time to complete this operation happens asynchronously. The result to this request is the location of the in-progress cancellation. + +Sample Trigger Result: + +~~~ +{ + "status": "accepted", + "request-id": 1, + "location": "/jobs/:jobid/cancel-with-savepoint/in-progress/1" +} +~~~ + +##### Monitoring Progress + +The progress of the cancellation has to be monitored by the user at + +~~~ +/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId +~~~ + +The request ID is returned by the trigger result. + +###### In-Progress + +~~~ +{ + "status": "in-progress", + "request-id": 1 +} +~~~ + +###### Success + +~~~ +{ + "status": "success", + "request-id": 1, + "savepoint-path": "<savepointPath>" +} +~~~ + +The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint. + +###### Failed + +~~~ +{ + "status": "failed", + "request-id": 1, + "cause": "<error message>" +} +~~~ http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 3e2634f..e907124 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler; +import org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers; import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler; import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler; @@ -238,6 +239,12 @@ public class WebRuntimeMonitor implements WebMonitor { } metricFetcher = new MetricFetcher(actorSystem, retriever, context); + String defaultSavepointDir = config.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, null); + + JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, context, defaultSavepointDir); + RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler()); + RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler()); + router = new Router() // config how to interact with this web server .GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval()))) @@ -307,6 +314,10 @@ public class WebRuntimeMonitor implements WebMonitor { // DELETE is the preferred way of canceling a job (Rest-conform) .DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler())) + .GET("/jobs/:jobid/cancel-with-savepoint", triggerHandler) + .GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", triggerHandler) + .GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler) + // stop a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler())) @@ -489,6 +500,7 @@ public class WebRuntimeMonitor implements WebMonitor { // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ + private RuntimeMonitorHandler handler(RequestHandler handler) { return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise.future(), timeout, serverSSLContext != null); http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java new file mode 100644 index 0000000..492ce76 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import akka.dispatch.OnComplete; +import com.fasterxml.jackson.core.JsonGenerator; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Request handler for {@link CancelJobWithSavepoint} messages. + */ +public class JobCancellationWithSavepointHandlers { + + /** URL for in-progress cancellations. */ + public static final String IN_PROGRESS_URL = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId"; + + /** Encodings for String. */ + private static final Charset ENCODING = Charset.forName("UTF-8"); + + /** Shared lock between Trigger and In-Progress handlers. */ + private final Object lock = new Object(); + + /** In-Progress requests */ + private final Map<JobID, Long> inProgress = new HashMap<>(); + + /** Succeeded/failed request. Either String or Throwable. */ + private final Map<Long, Object> completed = new HashMap<>(); + + /** Atomic request counter */ + private long requestCounter; + + /** Handler for trigger requests. */ + private final TriggerHandler triggerHandler; + + /** Handler for in-progress requests. */ + private final InProgressHandler inProgressHandler; + + /** Default savepoint directory. */ + private final String defaultSavepointDirectory; + + public JobCancellationWithSavepointHandlers( + ExecutionGraphHolder currentGraphs, + ExecutionContext executionContext) { + this(currentGraphs, executionContext, null); + } + + public JobCancellationWithSavepointHandlers( + ExecutionGraphHolder currentGraphs, + ExecutionContext executionContext, + @Nullable String defaultSavepointDirectory) { + + this.triggerHandler = new TriggerHandler(currentGraphs, executionContext); + this.inProgressHandler = new InProgressHandler(); + this.defaultSavepointDirectory = defaultSavepointDirectory; + } + + public TriggerHandler getTriggerHandler() { + return triggerHandler; + } + + public InProgressHandler getInProgressHandler() { + return inProgressHandler; + } + + // ------------------------------------------------------------------------ + // New requests + // ------------------------------------------------------------------------ + + /** + * Handler for triggering a {@link CancelJobWithSavepoint} message. + */ + class TriggerHandler implements RequestHandler { + + /** Current execution graphs. */ + private final ExecutionGraphHolder currentGraphs; + + /** Execution context for futures. */ + private final ExecutionContext executionContext; + + public TriggerHandler(ExecutionGraphHolder currentGraphs, ExecutionContext executionContext) { + this.currentGraphs = checkNotNull(currentGraphs); + this.executionContext = checkNotNull(executionContext); + } + + @Override + @SuppressWarnings("unchecked") + public FullHttpResponse handleRequest( + Map<String, String> pathParams, + Map<String, String> queryParams, + ActorGateway jobManager) throws Exception { + + try { + if (jobManager != null) { + JobID jobId = JobID.fromHexString(pathParams.get("jobid")); + + AccessExecutionGraph graph = currentGraphs.getExecutionGraph(jobId, jobManager); + if (graph == null) { + throw new Exception("Cannot find ExecutionGraph for job."); + } else { + CheckpointCoordinator coord = graph.getCheckpointCoordinator(); + if (coord == null) { + throw new Exception("Cannot find CheckpointCoordinator for job."); + } + + String targetDirectory = pathParams.get("targetDirectory"); + if (targetDirectory == null) { + if (defaultSavepointDirectory == null) { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."); + } else { + targetDirectory = defaultSavepointDirectory; + } + } + + return handleNewRequest(jobManager, jobId, targetDirectory, coord.getCheckpointTimeout()); + } + } else { + throw new Exception("No connection to the leading JobManager."); + } + } catch (Exception e) { + throw new Exception("Failed to cancel the job: " + e.getMessage(), e); + } + } + + @SuppressWarnings("unchecked") + private FullHttpResponse handleNewRequest(ActorGateway jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException { + // Check whether a request exists + final long requestId; + final boolean isNewRequest; + synchronized (lock) { + if (inProgress.containsKey(jobId)) { + requestId = inProgress.get(jobId); + isNewRequest = false; + } else { + requestId = ++requestCounter; + inProgress.put(jobId, requestId); + isNewRequest = true; + } + } + + if (isNewRequest) { + boolean success = false; + + try { + // Trigger cancellation + Object msg = new CancelJobWithSavepoint(jobId, targetDirectory); + Future<Object> cancelFuture = jobManager + .ask(msg, FiniteDuration.apply(checkpointTimeout, "ms")); + + cancelFuture.onComplete(new OnComplete<Object>() { + @Override + public void onComplete(Throwable failure, Object resp) throws Throwable { + synchronized (lock) { + try { + if (resp != null) { + if (resp.getClass() == CancellationSuccess.class) { + String path = ((CancellationSuccess) resp).savepointPath(); + completed.put(requestId, path); + } else if (resp.getClass() == CancellationFailure.class) { + Throwable cause = ((CancellationFailure) resp).cause(); + completed.put(requestId, cause); + } else { + Throwable cause = new IllegalStateException("Unexpected CancellationResponse of type " + resp.getClass()); + completed.put(requestId, cause); + } + } else { + completed.put(requestId, failure); + } + } finally { + inProgress.remove(jobId); + } + } + } + }, executionContext); + + success = true; + } finally { + synchronized (lock) { + if (!success) { + inProgress.remove(jobId); + } + } + } + } + + // In-progress location + String location = IN_PROGRESS_URL + .replace(":jobid", jobId.toString()) + .replace(":requestId", Long.toString(requestId)); + + // Accepted response + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + gen.writeStartObject(); + gen.writeStringField("status", "accepted"); + gen.writeNumberField("request-id", requestId); + gen.writeStringField("location", location); + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.ACCEPTED, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.LOCATION, location); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + FullHttpResponse accepted = response; + + return accepted; + } + } + + // ------------------------------------------------------------------------ + // In-progress requests + // ------------------------------------------------------------------------ + + /** + * Handler for in-progress cancel with savepoint operations. + */ + class InProgressHandler implements RequestHandler { + + /** The number of recent checkpoints whose IDs are remembered. */ + private static final int NUM_GHOST_REQUEST_IDS = 16; + + /** Remember some recently completed */ + private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS); + + @Override + @SuppressWarnings("unchecked") + public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + try { + if (jobManager != null) { + JobID jobId = JobID.fromHexString(pathParams.get("jobid")); + long requestId = Long.parseLong(pathParams.get("requestId")); + + synchronized (lock) { + Object result = completed.remove(requestId); + + if (result != null) { + // Add to recent history + recentlyCompleted.add(new Tuple2<>(requestId, result)); + if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) { + recentlyCompleted.remove(); + } + + if (result.getClass() == String.class) { + String savepointPath = (String) result; + return createSuccessResponse(requestId, savepointPath); + } else { + Throwable cause = (Throwable) result; + return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); + } + } else { + // Check in-progress + Long inProgressRequestId = inProgress.get(jobId); + if (inProgressRequestId != null) { + // Sanity check + if (inProgressRequestId == requestId) { + return createInProgressResponse(requestId); + } else { + String msg= "Request ID does not belong to JobID"; + return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg); + } + } + + // Check recent history + for (Tuple2<Long, Object> recent : recentlyCompleted) { + if (recent.f0 == requestId) { + if (recent.f1.getClass() == String.class) { + String savepointPath = (String) recent.f1; + return createSuccessResponse(requestId, savepointPath); + } else { + Throwable cause = (Throwable) recent.f1; + return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage()); + } + } + } + + return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID"); + } + } + } else { + throw new Exception("No connection to the leading JobManager."); + } + } catch (Exception e) { + throw new Exception("Failed to cancel the job: " + e.getMessage(), e); + } + } + + private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + gen.writeStartObject(); + + gen.writeStringField("status", "success"); + gen.writeNumberField("request-id", requestId); + gen.writeStringField("savepoint-path", savepointPath); + + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.CREATED, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + } + + private FullHttpResponse createInProgressResponse(long requestId) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + gen.writeStartObject(); + + gen.writeStringField("status", "in-progress"); + gen.writeNumberField("request-id", requestId); + + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.ACCEPTED, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + } + + private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + gen.writeStartObject(); + + gen.writeStringField("status", "failed"); + gen.writeNumberField("request-id", requestId); + gen.writeStringField("cause", errMsg); + + gen.writeEndObject(); + gen.close(); + + String json = writer.toString(); + byte[] bytes = json.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + code, + Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java new file mode 100644 index 0000000..cebb14e --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import akka.dispatch.ExecutionContexts$; +import akka.dispatch.Futures; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Test; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.impl.Promise; + +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class JobCancellationWithSavepointHandlersTest { + + private static final ExecutionContext EC = ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor()); + + /** + * Tests that the cancellation ask timeout respects the checkpoint timeout. + * Otherwise, AskTimeoutExceptions are bound to happen for large state. + */ + @Test + public void testAskTimeoutEqualsCheckpointTimeout() throws Exception { + long timeout = 128288238L; + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + when(coord.getCheckpointTimeout()).thenReturn(timeout); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC); + JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + params.put("targetDirectory", "placeholder"); + + ActorGateway jobManager = mock(ActorGateway.class); + + Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null)); + when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future); + + handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + verify(jobManager).ask(any(CancelJobWithSavepoint.class), eq(FiniteDuration.apply(timeout, "ms"))); + } + + /** + * Tests that the savepoint directory configuration is respected. + */ + @Test + public void testSavepointDirectoryConfiguration() throws Exception { + long timeout = 128288238L; + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + when(coord.getCheckpointTimeout()).thenReturn(timeout); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC, "the-default-directory"); + JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + + ActorGateway jobManager = mock(ActorGateway.class); + + Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null)); + when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future); + + // 1. Use targetDirectory path param + params.put("targetDirectory", "custom-directory"); + handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), eq(FiniteDuration.apply(timeout, "ms"))); + + // 2. Use default + params.remove("targetDirectory"); + + handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "the-default-directory")), eq(FiniteDuration.apply(timeout, "ms"))); + + // 3. Throw Exception + handlers = new JobCancellationWithSavepointHandlers(holder, EC, null); + handler = handlers.getTriggerHandler(); + + try { + handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + fail("Did not throw expected test Exception"); + } catch (Exception e) { + IllegalStateException cause = (IllegalStateException) e.getCause(); + assertEquals(true, cause.getMessage().contains(ConfigConstants.SAVEPOINT_DIRECTORY_KEY)); + } + } + + /** + * Tests triggering a new request and monitoring it. + */ + @Test + public void testTriggerNewRequest() throws Exception { + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC); + JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); + JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + params.put("targetDirectory", "custom-directory"); + + ActorGateway jobManager = mock(ActorGateway.class); + + // Successful + Promise<Object> promise = new Promise.DefaultPromise<>(); + when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(promise); + + // Trigger + FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class)); + + String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId); + + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION)); + + String json = response.content().toString(Charset.forName("UTF-8")); + JsonNode root = new ObjectMapper().readTree(json); + + assertEquals("accepted", root.get("status").getValueAsText()); + assertEquals("1", root.get("request-id").getValueAsText()); + assertEquals(location, root.get("location").getValueAsText()); + + // Trigger again + response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + assertEquals(location, response.headers().get(HttpHeaders.Names.LOCATION)); + + json = response.content().toString(Charset.forName("UTF-8")); + root = new ObjectMapper().readTree(json); + + assertEquals("accepted", root.get("status").getValueAsText()); + assertEquals("1", root.get("request-id").getValueAsText()); + assertEquals(location, root.get("location").getValueAsText()); + + // Only single actual request + verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class)); + + // Query progress + params.put("requestId", "1"); + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + root = new ObjectMapper().readTree(json); + + assertEquals("in-progress", root.get("status").getValueAsText()); + assertEquals("1", root.get("request-id").getValueAsText()); + + // Complete + promise.success(new CancellationSuccess(jobId, "_path-savepoint_")); + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + assertEquals(HttpResponseStatus.CREATED, response.getStatus()); + assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + + root = new ObjectMapper().readTree(json); + + assertEquals("success", root.get("status").getValueAsText()); + assertEquals("1", root.get("request-id").getValueAsText()); + assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText()); + + // Query again, keep recent history + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + + assertEquals(HttpResponseStatus.CREATED, response.getStatus()); + assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + + root = new ObjectMapper().readTree(json); + + assertEquals("success", root.get("status").getValueAsText()); + assertEquals("1", root.get("request-id").getValueAsText()); + assertEquals("_path-savepoint_", root.get("savepoint-path").getValueAsText()); + + // Query for unknown request + params.put("requestId", "9929"); + + response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus()); + assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + json = response.content().toString(Charset.forName("UTF-8")); + + root = new ObjectMapper().readTree(json); + + assertEquals("failed", root.get("status").getValueAsText()); + assertEquals("9929", root.get("request-id").getValueAsText()); + assertEquals("Unknown job/request ID", root.get("cause").getValueAsText()); + } + + /** + * Tests response when a request fails. + */ + @Test + public void testFailedCancellation() throws Exception { + JobID jobId = new JobID(); + ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraph graph = mock(ExecutionGraph.class); + CheckpointCoordinator coord = mock(CheckpointCoordinator.class); + when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(graph.getCheckpointCoordinator()).thenReturn(coord); + + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC); + JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); + JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); + + Map<String, String> params = new HashMap<>(); + params.put("jobid", jobId.toString()); + params.put("targetDirectory", "custom-directory"); + + ActorGateway jobManager = mock(ActorGateway.class); + + // Successful + Future<Object> future = Futures.failed(new Exception("Test Exception")); + when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future); + + // Trigger + trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class)); + + // Query progress + params.put("requestId", "1"); + + FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus()); + assertEquals("application/json", response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH)); + + String json = response.content().toString(Charset.forName("UTF-8")); + JsonNode root = new ObjectMapper().readTree(json); + + assertEquals("failed", root.get("status").getValueAsText()); + assertEquals("1", root.get("request-id").getValueAsText()); + assertEquals("Test Exception", root.get("cause").getValueAsText()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java index 13a9067..fe7ceef 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java @@ -100,7 +100,7 @@ public class AbstractMetricsHandlerTest extends TestLogger { pathParams.put("jobid", "nonexistent"); try { - assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null)); } catch (Exception e) { fail(); } @@ -126,7 +126,7 @@ public class AbstractMetricsHandlerTest extends TestLogger { queryParams.put("get", ""); try { - assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null)); } catch (Exception e) { fail(e.getMessage()); } @@ -136,7 +136,7 @@ public class AbstractMetricsHandlerTest extends TestLogger { queryParams.put("get", "subindex.opname.abc.metric5"); try { - assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null)); } catch (Exception e) { fail(e.getMessage()); } @@ -146,7 +146,7 @@ public class AbstractMetricsHandlerTest extends TestLogger { queryParams.put("get", "subindex.opname.abc.nonexistant"); try { - assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null)); } catch (Exception e) { fail(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 588ba84..698c2f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -835,6 +835,10 @@ public class CheckpointCoordinator { return checkpointIdCounter; } + public long getCheckpointTimeout() { + return checkpointTimeout; + } + // -------------------------------------------------------------------------------------------- // Periodic scheduling of checkpoints // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index 0ff6ace..0fd97da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -108,6 +109,14 @@ public interface AccessExecutionGraph { long getStatusTimestamp(JobStatus status); /** + * Returns the {@link CheckpointCoordinator} for this execution graph. + * + * @return CheckpointCoordinator for this execution graph or <code>null</code> + * if none is available. + */ + CheckpointCoordinator getCheckpointCoordinator(); + + /** * Returns the {@link CheckpointStatsTracker} for this execution graph. * * @return CheckpointStatsTracker for thie execution graph http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index 493825a..d8c58c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -198,6 +199,11 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl } @Override + public CheckpointCoordinator getCheckpointCoordinator() { + return null; + } + + @Override public CheckpointStatsTracker getCheckpointStatsTracker() { return tracker; } http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 101bdba..0a79cf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -416,6 +416,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } + @Override public CheckpointCoordinator getCheckpointCoordinator() { return checkpointCoordinator; }
