Repository: flink
Updated Branches:
  refs/heads/master e9b20ec21 -> 3bc9cad04


[FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers

- Add handlers for triggering and monitoring job cancellation with
  savepoints.

This closes #2626.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bc9cad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bc9cad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bc9cad0

Branch: refs/heads/master
Commit: 3bc9cad045b25d413f0b9f054fff12fac18a4f0e
Parents: 2fb6009
Author: Ufuk Celebi <[email protected]>
Authored: Tue Oct 11 10:09:20 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Oct 28 11:04:12 2016 +0200

----------------------------------------------------------------------
 docs/monitoring/rest_api.md                     |  70 +++-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  12 +
 .../JobCancellationWithSavepointHandlers.java   | 420 +++++++++++++++++++
 ...obCancellationWithSavepointHandlersTest.java | 314 ++++++++++++++
 .../metrics/AbstractMetricsHandlerTest.java     |   8 +-
 .../checkpoint/CheckpointCoordinator.java       |   4 +
 .../executiongraph/AccessExecutionGraph.java    |   9 +
 .../executiongraph/ArchivedExecutionGraph.java  |   6 +
 .../runtime/executiongraph/ExecutionGraph.java  |   1 +
 9 files changed, 839 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/docs/monitoring/rest_api.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index e7fccc5..e84e2cc 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -31,7 +31,6 @@ The monitoring API is a REST-ful API that accepts HTTP GET 
requests and responds
 {:toc}
 
 
-
 ## Overview
 
 The monitoring API is backed by a web server that runs as part of the 
*JobManager*. By default, this server listens at post `8081`, which can be 
configured in `flink-conf.yaml` via `jobmanager.web.port`. Note that the 
monitoring API web server and the web dashboard web server are currently the 
same and thus run together at the same port. They respond to different HTTP 
URLs, though.
@@ -584,3 +583,72 @@ Sample Result:
   } ]
 }
 ~~~
+
+### Job Cancellation
+
+#### Cancel Job
+
+`DELETE` request to **`/jobs/:jobid/cancel`**.
+
+Triggers job cancellation, result on success is `{}`.
+
+#### Cancel Job with Savepoint
+
+Triggers a savepoint and cancels the job after the savepoint succeeds.
+
+`GET` request to **`/jobs/:jobid/cancel-with-savepoint/`** triggers a 
savepoint to the default savepoint directory and cancels the job.
+
+`GET` request to 
**`/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory`** 
triggers a savepoint to the given target directory and cancels the job.
+
+Since savepoints can take some time to complete this operation happens 
asynchronously. The result to this request is the location of the in-progress 
cancellation.
+
+Sample Trigger Result:
+
+~~~
+{
+  "status": "accepted",
+  "request-id": 1,
+  "location": "/jobs/:jobid/cancel-with-savepoint/in-progress/1"
+}
+~~~
+
+##### Monitoring Progress
+
+The progress of the cancellation has to be monitored by the user at
+
+~~~
+/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId
+~~~
+
+The request ID is returned by the trigger result.
+
+###### In-Progress
+
+~~~
+{
+  "status": "in-progress",
+  "request-id": 1
+}
+~~~
+
+###### Success
+
+~~~
+{
+  "status": "success",
+  "request-id": 1,
+  "savepoint-path": "<savepointPath>"
+}
+~~~
+
+The `savepointPath` points to the external path of the savepoint, which can be 
used to resume the savepoint.
+
+###### Failed
+
+~~~
+{
+  "status": "failed",
+  "request-id": 1,
+  "cause": "<error message>"
+}
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3e2634f..e907124 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
+import 
org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers;
 import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
@@ -238,6 +239,12 @@ public class WebRuntimeMonitor implements WebMonitor {
                }
                metricFetcher = new MetricFetcher(actorSystem, retriever, 
context);
 
+               String defaultSavepointDir = 
config.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, null);
+
+               JobCancellationWithSavepointHandlers cancelWithSavepoint = new 
JobCancellationWithSavepointHandlers(currentGraphs, context, 
defaultSavepointDir);
+               RuntimeMonitorHandler triggerHandler = 
handler(cancelWithSavepoint.getTriggerHandler());
+               RuntimeMonitorHandler inProgressHandler = 
handler(cancelWithSavepoint.getInProgressHandler());
+
                router = new Router()
                        // config how to interact with this web server
                        .GET("/config", handler(new 
DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -307,6 +314,10 @@ public class WebRuntimeMonitor implements WebMonitor {
                        // DELETE is the preferred way of canceling a job 
(Rest-conform)
                        .DELETE("/jobs/:jobid/cancel", handler(new 
JobCancellationHandler()))
 
+                       .GET("/jobs/:jobid/cancel-with-savepoint", 
triggerHandler)
+                       
.GET("/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory", 
triggerHandler)
+                       
.GET(JobCancellationWithSavepointHandlers.IN_PROGRESS_URL, inProgressHandler)
+
                        // stop a job via GET (for proper integration with YARN 
this has to be performed via GET)
                        .GET("/jobs/:jobid/yarn-stop", handler(new 
JobStoppingHandler()))
 
@@ -489,6 +500,7 @@ public class WebRuntimeMonitor implements WebMonitor {
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
+
        private RuntimeMonitorHandler handler(RequestHandler handler) {
                return new RuntimeMonitorHandler(handler, retriever, 
jobManagerAddressPromise.future(), timeout,
                        serverSSLContext !=  null);

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
new file mode 100644
index 0000000..492ce76
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import akka.dispatch.OnComplete;
+import com.fasterxml.jackson.core.JsonGenerator;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler for {@link CancelJobWithSavepoint} messages.
+ */
+public class JobCancellationWithSavepointHandlers {
+
+       /** URL for in-progress cancellations. */
+       public static final String IN_PROGRESS_URL = 
"/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
+
+       /** Encodings for String. */
+       private static final Charset ENCODING = Charset.forName("UTF-8");
+
+       /** Shared lock between Trigger and In-Progress handlers. */
+       private final Object lock = new Object();
+
+       /** In-Progress requests */
+       private final Map<JobID, Long> inProgress = new HashMap<>();
+
+       /** Succeeded/failed request. Either String or Throwable. */
+       private final Map<Long, Object> completed = new HashMap<>();
+
+       /** Atomic request counter */
+       private long requestCounter;
+
+       /** Handler for trigger requests. */
+       private final TriggerHandler triggerHandler;
+
+       /** Handler for in-progress requests. */
+       private final InProgressHandler inProgressHandler;
+
+       /** Default savepoint directory. */
+       private final String defaultSavepointDirectory;
+
+       public JobCancellationWithSavepointHandlers(
+                       ExecutionGraphHolder currentGraphs,
+                       ExecutionContext executionContext) {
+               this(currentGraphs, executionContext, null);
+       }
+
+       public JobCancellationWithSavepointHandlers(
+                       ExecutionGraphHolder currentGraphs,
+                       ExecutionContext executionContext,
+                       @Nullable String defaultSavepointDirectory) {
+
+               this.triggerHandler = new TriggerHandler(currentGraphs, 
executionContext);
+               this.inProgressHandler = new InProgressHandler();
+               this.defaultSavepointDirectory = defaultSavepointDirectory;
+       }
+
+       public TriggerHandler getTriggerHandler() {
+               return triggerHandler;
+       }
+
+       public InProgressHandler getInProgressHandler() {
+               return inProgressHandler;
+       }
+
+       // 
------------------------------------------------------------------------
+       // New requests
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Handler for triggering a {@link CancelJobWithSavepoint} message.
+        */
+       class TriggerHandler implements RequestHandler {
+
+               /** Current execution graphs. */
+               private final ExecutionGraphHolder currentGraphs;
+
+               /** Execution context for futures. */
+               private final ExecutionContext executionContext;
+
+               public TriggerHandler(ExecutionGraphHolder currentGraphs, 
ExecutionContext executionContext) {
+                       this.currentGraphs = checkNotNull(currentGraphs);
+                       this.executionContext = checkNotNull(executionContext);
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public FullHttpResponse handleRequest(
+                               Map<String, String> pathParams,
+                               Map<String, String> queryParams,
+                               ActorGateway jobManager) throws Exception {
+
+                       try {
+                               if (jobManager != null) {
+                                       JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
+
+                                       AccessExecutionGraph graph = 
currentGraphs.getExecutionGraph(jobId, jobManager);
+                                       if (graph == null) {
+                                               throw new Exception("Cannot 
find ExecutionGraph for job.");
+                                       } else {
+                                               CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
+                                               if (coord == null) {
+                                                       throw new 
Exception("Cannot find CheckpointCoordinator for job.");
+                                               }
+
+                                               String targetDirectory = 
pathParams.get("targetDirectory");
+                                               if (targetDirectory == null) {
+                                                       if 
(defaultSavepointDirectory == null) {
+                                                               throw new 
IllegalStateException("No savepoint directory configured. " +
+                                                                               
"You can either specify a directory when triggering this savepoint or " +
+                                                                               
"configure a cluster-wide default via key '" +
+                                                                               
ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.");
+                                                       } else {
+                                                               targetDirectory 
= defaultSavepointDirectory;
+                                                       }
+                                               }
+
+                                               return 
handleNewRequest(jobManager, jobId, targetDirectory, 
coord.getCheckpointTimeout());
+                                       }
+                               } else {
+                                       throw new Exception("No connection to 
the leading JobManager.");
+                               }
+                       } catch (Exception e) {
+                               throw new Exception("Failed to cancel the job: 
" + e.getMessage(), e);
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               private FullHttpResponse handleNewRequest(ActorGateway 
jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) 
throws IOException {
+                       // Check whether a request exists
+                       final long requestId;
+                       final boolean isNewRequest;
+                       synchronized (lock) {
+                               if (inProgress.containsKey(jobId)) {
+                                       requestId = inProgress.get(jobId);
+                                       isNewRequest = false;
+                               } else {
+                                       requestId = ++requestCounter;
+                                       inProgress.put(jobId, requestId);
+                                       isNewRequest = true;
+                               }
+                       }
+
+                       if (isNewRequest) {
+                               boolean success = false;
+
+                               try {
+                                       // Trigger cancellation
+                                       Object msg = new 
CancelJobWithSavepoint(jobId, targetDirectory);
+                                       Future<Object> cancelFuture = jobManager
+                                                       .ask(msg, 
FiniteDuration.apply(checkpointTimeout, "ms"));
+
+                                       cancelFuture.onComplete(new 
OnComplete<Object>() {
+                                               @Override
+                                               public void 
onComplete(Throwable failure, Object resp) throws Throwable {
+                                                       synchronized (lock) {
+                                                               try {
+                                                                       if 
(resp != null) {
+                                                                               
if (resp.getClass() == CancellationSuccess.class) {
+                                                                               
        String path = ((CancellationSuccess) resp).savepointPath();
+                                                                               
        completed.put(requestId, path);
+                                                                               
} else if (resp.getClass() == CancellationFailure.class) {
+                                                                               
        Throwable cause = ((CancellationFailure) resp).cause();
+                                                                               
        completed.put(requestId, cause);
+                                                                               
} else {
+                                                                               
        Throwable cause = new IllegalStateException("Unexpected 
CancellationResponse of type " + resp.getClass());
+                                                                               
        completed.put(requestId, cause);
+                                                                               
}
+                                                                       } else {
+                                                                               
completed.put(requestId, failure);
+                                                                       }
+                                                               } finally {
+                                                                       
inProgress.remove(jobId);
+                                                               }
+                                                       }
+                                               }
+                                       }, executionContext);
+
+                                       success = true;
+                               } finally {
+                                       synchronized (lock) {
+                                               if (!success) {
+                                                       
inProgress.remove(jobId);
+                                               }
+                                       }
+                               }
+                       }
+
+                       // In-progress location
+                       String location = IN_PROGRESS_URL
+                                       .replace(":jobid", jobId.toString())
+                                       .replace(":requestId", 
Long.toString(requestId));
+
+                       // Accepted response
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       gen.writeStartObject();
+                       gen.writeStringField("status", "accepted");
+                       gen.writeNumberField("request-id", requestId);
+                       gen.writeStringField("location", location);
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       HttpResponseStatus.ACCEPTED,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.LOCATION, 
location);
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       FullHttpResponse accepted = response;
+
+                       return accepted;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // In-progress requests
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Handler for in-progress cancel with savepoint operations.
+        */
+       class InProgressHandler implements RequestHandler {
+
+               /** The number of recent checkpoints whose IDs are remembered. 
*/
+               private static final int NUM_GHOST_REQUEST_IDS = 16;
+
+               /** Remember some recently completed */
+               private final ArrayDeque<Tuple2<Long, Object>> 
recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public FullHttpResponse handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws 
Exception {
+                       try {
+                               if (jobManager != null) {
+                                       JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
+                                       long requestId = 
Long.parseLong(pathParams.get("requestId"));
+
+                                       synchronized (lock) {
+                                               Object result = 
completed.remove(requestId);
+
+                                               if (result != null) {
+                                                       // Add to recent history
+                                                       
recentlyCompleted.add(new Tuple2<>(requestId, result));
+                                                       if 
(recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+                                                               
recentlyCompleted.remove();
+                                                       }
+
+                                                       if (result.getClass() 
== String.class) {
+                                                               String 
savepointPath = (String) result;
+                                                               return 
createSuccessResponse(requestId, savepointPath);
+                                                       } else {
+                                                               Throwable cause 
= (Throwable) result;
+                                                               return 
createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, 
cause.getMessage());
+                                                       }
+                                               } else {
+                                                       // Check in-progress
+                                                       Long 
inProgressRequestId = inProgress.get(jobId);
+                                                       if (inProgressRequestId 
!= null) {
+                                                               // Sanity check
+                                                               if 
(inProgressRequestId == requestId) {
+                                                                       return 
createInProgressResponse(requestId);
+                                                               } else {
+                                                                       String 
msg= "Request ID does not belong to JobID";
+                                                                       return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+                                                               }
+                                                       }
+
+                                                       // Check recent history
+                                                       for (Tuple2<Long, 
Object> recent : recentlyCompleted) {
+                                                               if (recent.f0 
== requestId) {
+                                                                       if 
(recent.f1.getClass() == String.class) {
+                                                                               
String savepointPath = (String) recent.f1;
+                                                                               
return createSuccessResponse(requestId, savepointPath);
+                                                                       } else {
+                                                                               
Throwable cause = (Throwable) recent.f1;
+                                                                               
return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
requestId, cause.getMessage());
+                                                                       }
+                                                               }
+                                                       }
+
+                                                       return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown 
job/request ID");
+                                               }
+                                       }
+                               } else {
+                                       throw new Exception("No connection to 
the leading JobManager.");
+                               }
+                       } catch (Exception e) {
+                               throw new Exception("Failed to cancel the job: 
" + e.getMessage(), e);
+                       }
+               }
+
+               private FullHttpResponse createSuccessResponse(long requestId, 
String savepointPath) throws IOException {
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       gen.writeStartObject();
+
+                       gen.writeStringField("status", "success");
+                       gen.writeNumberField("request-id", requestId);
+                       gen.writeStringField("savepoint-path", savepointPath);
+
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       HttpResponseStatus.CREATED,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       return response;
+               }
+
+               private FullHttpResponse createInProgressResponse(long 
requestId) throws IOException {
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       gen.writeStartObject();
+
+                       gen.writeStringField("status", "in-progress");
+                       gen.writeNumberField("request-id", requestId);
+
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       HttpResponseStatus.ACCEPTED,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       return response;
+               }
+
+               private FullHttpResponse 
createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) 
throws IOException {
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+                       gen.writeStartObject();
+
+                       gen.writeStringField("status", "failed");
+                       gen.writeNumberField("request-id", requestId);
+                       gen.writeStringField("cause", errMsg);
+
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       code,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       return response;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
new file mode 100644
index 0000000..cebb14e
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import akka.dispatch.ExecutionContexts$;
+import akka.dispatch.Futures;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
+
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class JobCancellationWithSavepointHandlersTest {
+
+       private static final ExecutionContext EC = 
ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor());
+
+       /**
+        * Tests that the cancellation ask timeout respects the checkpoint 
timeout.
+        * Otherwise, AskTimeoutExceptions are bound to happen for large state.
+        */
+       @Test
+       public void testAskTimeoutEqualsCheckpointTimeout() throws Exception {
+               long timeout = 128288238L;
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+               when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC);
+               JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+               params.put("targetDirectory", "placeholder");
+
+               ActorGateway jobManager = mock(ActorGateway.class);
+
+               Future<Object> future = Futures.successful((Object) new 
CancellationSuccess(jobId, null));
+               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(future);
+
+               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+
+               verify(jobManager).ask(any(CancelJobWithSavepoint.class), 
eq(FiniteDuration.apply(timeout, "ms")));
+       }
+
+       /**
+        * Tests that the savepoint directory configuration is respected.
+        */
+       @Test
+       public void testSavepointDirectoryConfiguration() throws Exception {
+               long timeout = 128288238L;
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+               when(coord.getCheckpointTimeout()).thenReturn(timeout);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC, "the-default-directory");
+               JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+
+               ActorGateway jobManager = mock(ActorGateway.class);
+
+               Future<Object> future = Futures.successful((Object) new 
CancellationSuccess(jobId, null));
+               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(future);
+
+               // 1. Use targetDirectory path param
+               params.put("targetDirectory", "custom-directory");
+               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+
+               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+
+               // 2. Use default
+               params.remove("targetDirectory");
+
+               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+
+               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"the-default-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+
+               // 3. Throw Exception
+               handlers = new JobCancellationWithSavepointHandlers(holder, EC, 
null);
+               handler = handlers.getTriggerHandler();
+
+               try {
+                       handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+                       fail("Did not throw expected test Exception");
+               } catch (Exception e) {
+                       IllegalStateException cause = (IllegalStateException) 
e.getCause();
+                       assertEquals(true, 
cause.getMessage().contains(ConfigConstants.SAVEPOINT_DIRECTORY_KEY));
+               }
+       }
+
+       /**
+        * Tests triggering a new request and monitoring it.
+        */
+       @Test
+       public void testTriggerNewRequest() throws Exception {
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC);
+               JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
+               JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+               params.put("targetDirectory", "custom-directory");
+
+               ActorGateway jobManager = mock(ActorGateway.class);
+
+               // Successful
+               Promise<Object> promise = new Promise.DefaultPromise<>();
+               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(promise);
+
+               // Trigger
+               FullHttpResponse response = trigger.handleRequest(params, 
Collections.<String, String>emptyMap(), jobManager);
+
+               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), any(FiniteDuration.class));
+
+               String location = 
String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
+
+               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+               assertEquals("application/json", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+               assertEquals(location, 
response.headers().get(HttpHeaders.Names.LOCATION));
+
+               String json = 
response.content().toString(Charset.forName("UTF-8"));
+               JsonNode root = new ObjectMapper().readTree(json);
+
+               assertEquals("accepted", root.get("status").getValueAsText());
+               assertEquals("1", root.get("request-id").getValueAsText());
+               assertEquals(location, root.get("location").getValueAsText());
+
+               // Trigger again
+               response = trigger.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+               assertEquals("application/json", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+               assertEquals(location, 
response.headers().get(HttpHeaders.Names.LOCATION));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("accepted", root.get("status").getValueAsText());
+               assertEquals("1", root.get("request-id").getValueAsText());
+               assertEquals(location, root.get("location").getValueAsText());
+
+               // Only single actual request
+               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), any(FiniteDuration.class));
+
+               // Query progress
+               params.put("requestId", "1");
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+               assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+               assertEquals("application/json", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("in-progress", 
root.get("status").getValueAsText());
+               assertEquals("1", root.get("request-id").getValueAsText());
+
+               // Complete
+               promise.success(new CancellationSuccess(jobId, 
"_path-savepoint_"));
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+
+               assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+               assertEquals("application/json", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("success", root.get("status").getValueAsText());
+               assertEquals("1", root.get("request-id").getValueAsText());
+               assertEquals("_path-savepoint_", 
root.get("savepoint-path").getValueAsText());
+
+               // Query again, keep recent history
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+
+               assertEquals(HttpResponseStatus.CREATED, response.getStatus());
+               assertEquals("application/json", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("success", root.get("status").getValueAsText());
+               assertEquals("1", root.get("request-id").getValueAsText());
+               assertEquals("_path-savepoint_", 
root.get("savepoint-path").getValueAsText());
+
+               // Query for unknown request
+               params.put("requestId", "9929");
+
+               response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+               assertEquals(HttpResponseStatus.BAD_REQUEST, 
response.getStatus());
+               assertEquals("application/json", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               json = response.content().toString(Charset.forName("UTF-8"));
+
+               root = new ObjectMapper().readTree(json);
+
+               assertEquals("failed", root.get("status").getValueAsText());
+               assertEquals("9929", root.get("request-id").getValueAsText());
+               assertEquals("Unknown job/request ID", 
root.get("cause").getValueAsText());
+       }
+
+       /**
+        * Tests response when a request fails.
+        */
+       @Test
+       public void testFailedCancellation() throws Exception {
+               JobID jobId = new JobID();
+               ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
+               ExecutionGraph graph = mock(ExecutionGraph.class);
+               CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
+               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(graph.getCheckpointCoordinator()).thenReturn(coord);
+
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC);
+               JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
+               JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
+
+               Map<String, String> params = new HashMap<>();
+               params.put("jobid", jobId.toString());
+               params.put("targetDirectory", "custom-directory");
+
+               ActorGateway jobManager = mock(ActorGateway.class);
+
+               // Successful
+               Future<Object> future = Futures.failed(new Exception("Test 
Exception"));
+               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(future);
+
+               // Trigger
+               trigger.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), any(FiniteDuration.class));
+
+               // Query progress
+               params.put("requestId", "1");
+
+               FullHttpResponse response = progress.handleRequest(params, 
Collections.<String, String>emptyMap(), jobManager);
+               assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
response.getStatus());
+               assertEquals("application/json", 
response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
+               
assertEquals(Integer.toString(response.content().readableBytes()), 
response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
+
+               String json = 
response.content().toString(Charset.forName("UTF-8"));
+               JsonNode root = new ObjectMapper().readTree(json);
+
+               assertEquals("failed", root.get("status").getValueAsText());
+               assertEquals("1", root.get("request-id").getValueAsText());
+               assertEquals("Test Exception", 
root.get("cause").getValueAsText());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 13a9067..fe7ceef 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -100,7 +100,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                pathParams.put("jobid", "nonexistent");
 
                try {
-                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null));
                } catch (Exception e) {
                        fail();
                }
@@ -126,7 +126,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                queryParams.put("get", "");
 
                try {
-                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null));
                } catch (Exception e) {
                        fail(e.getMessage());
                }
@@ -136,7 +136,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                queryParams.put("get", "subindex.opname.abc.metric5");
 
                try {
-                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null));
                } catch (Exception e) {
                        fail(e.getMessage());
                }
@@ -146,7 +146,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                queryParams.put("get", "subindex.opname.abc.nonexistant");
 
                try {
-                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+                       assertEquals("", handler.handleJsonRequest(pathParams, 
queryParams, null));
                } catch (Exception e) {
                        fail(e.getMessage());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 588ba84..698c2f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -835,6 +835,10 @@ public class CheckpointCoordinator {
                return checkpointIdCounter;
        }
 
+       public long getCheckpointTimeout() {
+               return checkpointTimeout;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Periodic scheduling of checkpoints
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 0ff6ace..0fd97da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -108,6 +109,14 @@ public interface AccessExecutionGraph {
        long getStatusTimestamp(JobStatus status);
 
        /**
+        * Returns the {@link CheckpointCoordinator} for this execution graph.
+        *
+        * @return CheckpointCoordinator for this execution graph or 
<code>null</code>
+        * if none is available.
+        */
+       CheckpointCoordinator getCheckpointCoordinator();
+
+       /**
         * Returns the {@link CheckpointStatsTracker} for this execution graph.
         *
         * @return CheckpointStatsTracker for thie execution graph

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 493825a..d8c58c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -198,6 +199,11 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
        }
 
        @Override
+       public CheckpointCoordinator getCheckpointCoordinator() {
+               return null;
+       }
+
+       @Override
        public CheckpointStatsTracker getCheckpointStatsTracker() {
                return tracker;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3bc9cad0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 101bdba..0a79cf2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -416,6 +416,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
+       @Override
        public CheckpointCoordinator getCheckpointCoordinator() {
                return checkpointCoordinator;
        }

Reply via email to