http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 513dc08..1a7d868 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler for the CANCEL request.
@@ -36,7 +39,8 @@ public class JobCancellationHandler extends 
AbstractJsonRequestHandler {
 
        private final Time timeout;
 
-       public JobCancellationHandler(Time timeout) {
+       public JobCancellationHandler(Executor executor, Time timeout) {
+               super(executor);
                this.timeout = Preconditions.checkNotNull(timeout);
        }
 
@@ -46,19 +50,23 @@ public class JobCancellationHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
-               try {
-                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-                       if (jobManagerGateway != null) {
-                               jobManagerGateway.cancelJob(jobId, timeout);
-                               return "{}";
-                       }
-                       else {
-                               throw new Exception("No connection to the 
leading JobManager.");
-                       }
-               }
-               catch (Exception e) {
-                       throw new Exception("Failed to cancel the job with id: 
"  + pathParams.get("jobid") + e.getMessage(), e);
-               }
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+                                       if (jobManagerGateway != null) {
+                                               
jobManagerGateway.cancelJob(jobId, timeout);
+                                               return "{}";
+                                       }
+                                       else {
+                                               throw new Exception("No 
connection to the leading JobManager.");
+                                       }
+                               }
+                               catch (Exception e) {
+                                       throw new FlinkFutureException("Failed 
to cancel the job with id: "  + pathParams.get("jobid"), e);
+                               }
+                       },
+                       executor);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 9b474aa..4e41447 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -24,12 +24,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
-import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -140,48 +141,48 @@ public class JobCancellationWithSavepointHandlers {
 
                @Override
                @SuppressWarnings("unchecked")
-               public FullHttpResponse handleRequest(
+               public CompletableFuture<FullHttpResponse> handleRequest(
                                Map<String, String> pathParams,
                                Map<String, String> queryParams,
-                               JobManagerGateway jobManagerGateway) throws 
Exception {
+                               JobManagerGateway jobManagerGateway) {
 
-                       try {
-                               if (jobManagerGateway != null) {
-                                       JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
-                                       final Optional<AccessExecutionGraph> 
optGraph;
+                       if (jobManagerGateway != null) {
+                               JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
+                               final 
CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
 
-                                       try {
-                                               optGraph = 
currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
-                                       } catch (Exception e) {
-                                               throw new FlinkException("Could 
not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
-                                       }
+                               graphFuture = 
currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
 
-                                       final AccessExecutionGraph graph = 
optGraph.orElseThrow(
-                                               () -> new 
NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
+                               return graphFuture.thenApplyAsync(
+                                       (Optional<AccessExecutionGraph> 
optGraph) -> {
+                                               final AccessExecutionGraph 
graph = optGraph.orElseThrow(
+                                                       () -> new 
FlinkFutureException(
+                                                               new 
NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
 
-                                       CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
-                                       if (coord == null) {
-                                               throw new Exception("Cannot 
find CheckpointCoordinator for job.");
-                                       }
+                                               CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
+                                               if (coord == null) {
+                                                       throw new 
FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for 
job."));
+                                               }
 
-                                       String targetDirectory = 
pathParams.get("targetDirectory");
-                                       if (targetDirectory == null) {
-                                               if (defaultSavepointDirectory 
== null) {
-                                                       throw new 
IllegalStateException("No savepoint directory configured. " +
+                                               String targetDirectory = 
pathParams.get("targetDirectory");
+                                               if (targetDirectory == null) {
+                                                       if 
(defaultSavepointDirectory == null) {
+                                                               throw new 
IllegalStateException("No savepoint directory configured. " +
                                                                        "You 
can either specify a directory when triggering this savepoint or " +
                                                                        
"configure a cluster-wide default via key '" +
                                                                        
CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-                                               } else {
-                                                       targetDirectory = 
defaultSavepointDirectory;
+                                                       } else {
+                                                               targetDirectory 
= defaultSavepointDirectory;
+                                                       }
                                                }
-                                       }
 
-                                       return 
handleNewRequest(jobManagerGateway, jobId, targetDirectory, 
coord.getCheckpointTimeout());
-                               } else {
-                                       throw new Exception("No connection to 
the leading JobManager.");
-                               }
-                       } catch (Exception e) {
-                               throw new Exception("Failed to cancel the job: 
" + e.getMessage(), e);
+                                               try {
+                                                       return 
handleNewRequest(jobManagerGateway, jobId, targetDirectory, 
coord.getCheckpointTimeout());
+                                               } catch (IOException e) {
+                                                       throw new 
FlinkFutureException("Could not cancel job with savepoint.", e);
+                                               }
+                                       }, executor);
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
                        }
                }
 
@@ -288,64 +289,63 @@ public class JobCancellationWithSavepointHandlers {
 
                @Override
                @SuppressWarnings("unchecked")
-               public FullHttpResponse handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) throws Exception {
-                       try {
-                               if (jobManagerGateway != null) {
-                                       JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
-                                       long requestId = 
Long.parseLong(pathParams.get("requestId"));
+               public CompletableFuture<FullHttpResponse> 
handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, 
JobManagerGateway jobManagerGateway) {
+                       JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
+                       long requestId = 
Long.parseLong(pathParams.get("requestId"));
 
-                                       synchronized (lock) {
-                                               Object result = 
completed.remove(requestId);
-
-                                               if (result != null) {
-                                                       // Add to recent history
-                                                       
recentlyCompleted.add(new Tuple2<>(requestId, result));
-                                                       if 
(recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
-                                                               
recentlyCompleted.remove();
-                                                       }
+                       return CompletableFuture.supplyAsync(
+                               () -> {
+                                       try {
+                                               synchronized (lock) {
+                                                       Object result = 
completed.remove(requestId);
+
+                                                       if (result != null) {
+                                                               // Add to 
recent history
+                                                               
recentlyCompleted.add(new Tuple2<>(requestId, result));
+                                                               if 
(recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+                                                                       
recentlyCompleted.remove();
+                                                               }
 
-                                                       if (result.getClass() 
== String.class) {
-                                                               String 
savepointPath = (String) result;
-                                                               return 
createSuccessResponse(requestId, savepointPath);
-                                                       } else {
-                                                               Throwable cause 
= (Throwable) result;
-                                                               return 
createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, 
cause.getMessage());
-                                                       }
-                                               } else {
-                                                       // Check in-progress
-                                                       Long 
inProgressRequestId = inProgress.get(jobId);
-                                                       if (inProgressRequestId 
!= null) {
-                                                               // Sanity check
-                                                               if 
(inProgressRequestId == requestId) {
-                                                                       return 
createInProgressResponse(requestId);
+                                                               if 
(result.getClass() == String.class) {
+                                                                       String 
savepointPath = (String) result;
+                                                                       return 
createSuccessResponse(requestId, savepointPath);
                                                                } else {
-                                                                       String 
msg = "Request ID does not belong to JobID";
-                                                                       return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+                                                                       
Throwable cause = (Throwable) result;
+                                                                       return 
createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, 
cause.getMessage());
                                                                }
-                                                       }
-
-                                                       // Check recent history
-                                                       for (Tuple2<Long, 
Object> recent : recentlyCompleted) {
-                                                               if (recent.f0 
== requestId) {
-                                                                       if 
(recent.f1.getClass() == String.class) {
-                                                                               
String savepointPath = (String) recent.f1;
-                                                                               
return createSuccessResponse(requestId, savepointPath);
+                                                       } else {
+                                                               // Check 
in-progress
+                                                               Long 
inProgressRequestId = inProgress.get(jobId);
+                                                               if 
(inProgressRequestId != null) {
+                                                                       // 
Sanity check
+                                                                       if 
(inProgressRequestId == requestId) {
+                                                                               
return createInProgressResponse(requestId);
                                                                        } else {
-                                                                               
Throwable cause = (Throwable) recent.f1;
-                                                                               
return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
requestId, cause.getMessage());
+                                                                               
String msg = "Request ID does not belong to JobID";
+                                                                               
return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
                                                                        }
                                                                }
-                                                       }
 
-                                                       return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown 
job/request ID");
+                                                               // Check recent 
history
+                                                               for 
(Tuple2<Long, Object> recent : recentlyCompleted) {
+                                                                       if 
(recent.f0 == requestId) {
+                                                                               
if (recent.f1.getClass() == String.class) {
+                                                                               
        String savepointPath = (String) recent.f1;
+                                                                               
        return createSuccessResponse(requestId, savepointPath);
+                                                                               
} else {
+                                                                               
        Throwable cause = (Throwable) recent.f1;
+                                                                               
        return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
requestId, cause.getMessage());
+                                                                               
}
+                                                                       }
+                                                               }
+
+                                                               return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown 
job/request ID");
+                                                       }
                                                }
+                                       } catch (Exception e) {
+                                               throw new 
FlinkFutureException("Could not handle in progress request.", e);
                                        }
-                               } else {
-                                       throw new Exception("No connection to 
the leading JobManager.");
-                               }
-                       } catch (Exception e) {
-                               throw new Exception("Failed to cancel the job: 
" + e.getMessage(), e);
-                       }
+                               });
                }
 
                private FullHttpResponse createSuccessResponse(long requestId, 
String savepointPath) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 72cf8b7..0b15b37 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the execution config of a job.
@@ -39,8 +42,8 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
 
        private static final String JOB_CONFIG_REST_PATH = 
"/jobs/:jobid/config";
 
-       public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
-               super(executionGraphHolder);
+       public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -49,8 +52,17 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               return createJobConfigJson(graph);
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createJobConfigJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write job config json.", e);
+                               }
+                       },
+                       executor);
+
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 87ac7c3..8a50f87 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns details about a job. This includes:
@@ -57,8 +60,8 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        private final MetricFetcher fetcher;
 
-       public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, 
MetricFetcher fetcher) {
-               super(executionGraphHolder);
+       public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
                this.fetcher = fetcher;
        }
 
@@ -68,8 +71,16 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               return createJobDetailsJson(graph, fetcher);
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createJobDetailsJson(graph, 
fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create job details json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index e31299b..6ffd443 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the configuration of a job.
@@ -45,8 +48,8 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
-       public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
-               super(executionGraphHolder);
+       public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -55,8 +58,17 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               return createJobExceptionsJson(graph);
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createJobExceptionsJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create job exceptions json.", e);
+                               }
+                       },
+                       executor
+               );
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index e2437e6..cb6d8c0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Returns the Job Manager's configuration.
@@ -35,7 +39,8 @@ public class JobManagerConfigHandler extends 
AbstractJsonRequestHandler {
 
        private final Configuration config;
 
-       public JobManagerConfigHandler(Configuration config) {
+       public JobManagerConfigHandler(Executor executor, Configuration config) 
{
+               super(executor);
                this.config = config;
        }
 
@@ -45,31 +50,38 @@ public class JobManagerConfigHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       StringWriter writer = new 
StringWriter();
+                                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-               gen.writeStartArray();
-               for (String key : config.keySet()) {
-                       gen.writeStartObject();
-                       gen.writeStringField("key", key);
+                                       gen.writeStartArray();
+                                       for (String key : config.keySet()) {
+                                               gen.writeStartObject();
+                                               gen.writeStringField("key", 
key);
 
-                       // Mask key values which contain sensitive information
-                       if (key.toLowerCase().contains("password")) {
-                               String value = config.getString(key, null);
-                               if (value != null) {
-                                       value = "******";
-                               }
-                               gen.writeStringField("value", value);
-                       }
-                       else {
-                               gen.writeStringField("value", 
config.getString(key, null));
-                       }
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
+                                               // Mask key values which 
contain sensitive information
+                                               if 
(key.toLowerCase().contains("password")) {
+                                                       String value = 
config.getString(key, null);
+                                                       if (value != null) {
+                                                               value = 
"******";
+                                                       }
+                                                       
gen.writeStringField("value", value);
+                                               } else {
+                                                       
gen.writeStringField("value", config.getString(key, null));
+                                               }
+                                               gen.writeEndObject();
+                                       }
+                                       gen.writeEndArray();
 
-               gen.close();
-               return writer.toString();
+                                       gen.close();
+                                       return writer.toString();
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write configuration.", e);
+                               }
+                       },
+                       executor);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index d17b6bb..b3a9dd5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the JSON program plan of a job graph.
@@ -35,8 +37,8 @@ public class JobPlanHandler extends 
AbstractExecutionGraphRequestHandler {
 
        private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
 
-       public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
-               super(executionGraphHolder);
+       public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -45,8 +47,8 @@ public class JobPlanHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               return graph.getJsonPlan();
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.completedFuture(graph.getJsonPlan());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 3526734..f63403f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler for the STOP request.
@@ -36,7 +39,8 @@ public class JobStoppingHandler extends 
AbstractJsonRequestHandler {
 
        private final Time timeout;
 
-       public JobStoppingHandler(Time timeout) {
+       public JobStoppingHandler(Executor executor, Time timeout) {
+               super(executor);
                this.timeout = Preconditions.checkNotNull(timeout);
        }
 
@@ -46,19 +50,23 @@ public class JobStoppingHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
-               try {
-                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-                       if (jobManagerGateway != null) {
-                               jobManagerGateway.stopJob(jobId, timeout);
-                               return "{}";
-                       }
-                       else {
-                               throw new Exception("No connection to the 
leading JobManager.");
-                       }
-               }
-               catch (Exception e) {
-                       throw new Exception("Failed to stop the job with id: "  
+ pathParams.get("jobid") + e.getMessage(), e);
-               }
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+                                       if (jobManagerGateway != null) {
+                                               
jobManagerGateway.stopJob(jobId, timeout);
+                                               return "{}";
+                                       }
+                                       else {
+                                               throw new Exception("No 
connection to the leading JobManager.");
+                                       }
+                               }
+                               catch (Exception e) {
+                                       throw new FlinkFutureException("Failed 
to stop the job with id: "  + pathParams.get("jobid") + '.', e);
+                               }
+                       },
+                       executor);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 8e90dfc..9c613ff 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the accummulators for a given vertex.
@@ -41,8 +44,8 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
 
        private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/accumulators";
 
-       public JobVertexAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
-               super(executionGraphHolder);
+       public JobVertexAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -51,8 +54,17 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
        }
 
        @Override
-       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
-               return createVertexAccumulatorsJson(jobVertex);
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createVertexAccumulatorsJson(jobVertex);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create job vertex accumulators json.", e);
+                               }
+                       },
+                       executor);
+
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index cde8ca9..963153f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -27,8 +28,11 @@ import 
org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import scala.Option;
 
@@ -51,10 +55,11 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
 
        public JobVertexBackPressureHandler(
                        ExecutionGraphHolder executionGraphHolder,
+                       Executor executor,
                        BackPressureStatsTracker backPressureStatsTracker,
                        int refreshInterval) {
 
-               super(executionGraphHolder);
+               super(executionGraphHolder, executor);
                this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker, "Stats tracker");
                checkArgument(refreshInterval >= 0, "Negative timeout");
                this.refreshInterval = refreshInterval;
@@ -66,11 +71,11 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
        }
 
        @Override
-       public String handleRequest(
+       public CompletableFuture<String> handleRequest(
                        AccessExecutionJobVertex accessJobVertex,
-                       Map<String, String> params) throws Exception {
+                       Map<String, String> params) {
                if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
-                       return "";
+                       return CompletableFuture.completedFuture("");
                }
                ExecutionJobVertex jobVertex = (ExecutionJobVertex) 
accessJobVertex;
                try (StringWriter writer = new StringWriter();
@@ -116,7 +121,9 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
                        gen.writeEndObject();
                        gen.close();
 
-                       return writer.toString();
+                       return 
CompletableFuture.completedFuture(writer.toString());
+               } catch (IOException e) {
+                       return FutureUtils.completedExceptionally(e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 7757fdd..bd1745c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * A request handler that provides the details of a job vertex, including id, 
name, parallelism,
@@ -50,8 +53,8 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
 
        private final MetricFetcher fetcher;
 
-       public JobVertexDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
-               super(executionGraphHolder);
+       public JobVertexDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
                this.fetcher = fetcher;
        }
 
@@ -61,8 +64,16 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
        }
 
        @Override
-       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
-               return createVertexDetailsJson(jobVertex, params.get("jobid"), 
fetcher);
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write the vertex details json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index a612782..0827720 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -41,6 +42,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * A request handler that provides the details of a job vertex, including id, 
name, and the
@@ -52,8 +55,8 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
 
        private final MetricFetcher fetcher;
 
-       public JobVertexTaskManagersHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
-               super(executionGraphHolder);
+       public JobVertexTaskManagersHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
                this.fetcher = fetcher;
        }
 
@@ -63,8 +66,16 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
        }
 
        @Override
-       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
-               return createVertexDetailsByTaskManagerJson(jobVertex, 
params.get("jobid"), fetcher);
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create TaskManager json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 079be8f..8ca785f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Base interface for all request handlers.
@@ -44,13 +44,8 @@ public interface RequestHandler {
         * @param jobManagerGateway to talk to the JobManager.
         *
         * @return The full http response.
-        *
-        * @throws Exception Handlers may forward exceptions. Exceptions of type
-        *         {@link NotFoundException} will cause a HTTP 404
-        *         response with the exception message, other exceptions will 
cause a HTTP 500 response
-        *         with the exception stack trace.
         */
-       FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception;
+       CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway);
 
        /**
         * Returns an array of REST URL's under which this handler can be 
registered.

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 28e9ddf..301b217 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler providing details about a single task execution attempt.
@@ -31,8 +33,8 @@ public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttempt
 
        public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
 
-       public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
-               super(executionGraphHolder, fetcher);
+       public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor, fetcher);
        }
 
        @Override
@@ -41,7 +43,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttempt
        }
 
        @Override
-       public String handleRequest(AccessExecutionVertex vertex, Map<String, 
String> params) throws Exception {
+       public CompletableFuture<String> handleRequest(AccessExecutionVertex 
vertex, Map<String, String> params) {
                return handleRequest(vertex.getCurrentExecutionAttempt(), 
params);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 171277f..3c0d1d9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific job 
vertex (defined
@@ -44,8 +47,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
        private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
 
-       public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
-               super(executionGraphHolder);
+       public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -54,8 +57,16 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
        }
 
        @Override
-       public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
-               return createAttemptAccumulatorsJson(execAttempt);
+       public CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createAttemptAccumulatorsJson(execAttempt);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create accumulator json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 37c0e50..ad836df 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -40,6 +41,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
 
@@ -52,8 +55,8 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
 
        private final MetricFetcher fetcher;
 
-       public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
-               super(executionGraphHolder);
+       public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
                this.fetcher = fetcher;
        }
 
@@ -63,8 +66,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
        }
 
        @Override
-       public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
-               return createAttemptDetailsJson(execAttempt, 
params.get("jobid"), params.get("vertexid"), fetcher);
+       public CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createAttemptDetailsJson(execAttempt, params.get("jobid"), 
params.get("vertexid"), fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create attempt details json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 64bdfb4..8142548 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the accumulators for all subtasks of job 
vertex.
@@ -43,8 +46,8 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
 
        private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH =       
"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
 
-       public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder) {
-               super(executionGraphHolder);
+       public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -53,8 +56,16 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
        }
 
        @Override
-       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
-               return createSubtasksAccumulatorsJson(jobVertex);
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createSubtasksAccumulatorsJson(jobVertex);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create subtasks accumulator json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index ea88587..d766206 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the state transition timestamps for all 
subtasks, plus their
@@ -44,8 +47,8 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
 
        private static final String SUBTASK_TIMES_REST_PATH =   
"/jobs/:jobid/vertices/:vertexid/subtasktimes";
 
-       public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
-               super(executionGraphHolder);
+       public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -54,8 +57,16 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
        }
 
        @Override
-       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
-               return createSubtaskTimesJson(jobVertex);
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createSubtaskTimesJson(jobVertex);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write subtask time json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a8ab7a3..9f83ed0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -28,14 +30,14 @@ import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static java.util.Objects.requireNonNull;
 
@@ -53,7 +55,8 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
 
        private final MetricFetcher fetcher;
 
-       public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
+       public TaskManagersHandler(Executor executor, Time timeout, 
MetricFetcher fetcher) {
+               super(executor);
                this.timeout = requireNonNull(timeout);
                this.fetcher = fetcher;
        }
@@ -64,134 +67,139 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
-               try {
-                       if (jobManagerGateway != null) {
-                               // whether one task manager's metrics are 
requested, or all task manager, we
-                               // return them in an array. This avoids 
unnecessary code complexity.
-                               // If only one task manager is requested, we 
only fetch one task manager metrics.
-                               final List<Instance> instances = new 
ArrayList<>();
-                               if 
(pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-                                       try {
-                                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-                                               
CompletableFuture<Optional<Instance>> tmInstanceFuture = 
jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
-
-                                               Optional<Instance> instance = 
tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-                                               
instance.ifPresent(instances::add);
-                                       }
-                                       // this means the id string was 
invalid. Keep the list empty.
-                                       catch (IllegalArgumentException e){
-                                               // do nothing.
-                                       }
-                               } else {
-                                       CompletableFuture<Collection<Instance>> 
tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-                                       Collection<Instance> tmInstances = 
tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-                                       instances.addAll(tmInstances);
-                               }
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               if (jobManagerGateway != null) {
+                       // whether one task manager's metrics are requested, or 
all task manager, we
+                       // return them in an array. This avoids unnecessary 
code complexity.
+                       // If only one task manager is requested, we only fetch 
one task manager metrics.
+                       if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+                               CompletableFuture<Optional<Instance>> 
tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
+
+                               return tmInstanceFuture.thenApplyAsync(
+                                       (Optional<Instance> optTaskManager) -> {
+                                               try {
+                                                       return 
writeTaskManagersJson(
+                                                               
optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+                                                               pathParams);
+                                               } catch (IOException e) {
+                                                       throw new 
FlinkFutureException("Could not write TaskManagers JSON.", e);
+                                               }
+                                       },
+                                       executor);
+                       } else {
+                               CompletableFuture<Collection<Instance>> 
tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+                               return tmInstancesFuture.thenApplyAsync(
+                                       (Collection<Instance> taskManagers) -> {
+                                               try {
+                                                       return 
writeTaskManagersJson(taskManagers, pathParams);
+                                               } catch (IOException e) {
+                                                       throw new 
FlinkFutureException("Could not write TaskManagers JSON.", e);
+                                               }
+                                       },
+                                       executor);
+                       }
+               }
+               else {
+                       return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
+               }
+       }
 
-                               StringWriter writer = new StringWriter();
-                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-                               gen.writeStartObject();
-                               gen.writeArrayFieldStart("taskmanagers");
-
-                               for (Instance instance : instances) {
-                                       gen.writeStartObject();
-                                       gen.writeStringField("id", 
instance.getId().toString());
-                                       gen.writeStringField("path", 
instance.getTaskManagerGateway().getAddress());
-                                       gen.writeNumberField("dataPort", 
instance.getTaskManagerLocation().dataPort());
-                                       
gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
-                                       gen.writeNumberField("slotsNumber", 
instance.getTotalNumberOfSlots());
-                                       gen.writeNumberField("freeSlots", 
instance.getNumberOfAvailableSlots());
-                                       gen.writeNumberField("cpuCores", 
instance.getResources().getNumberOfCPUCores());
-                                       gen.writeNumberField("physicalMemory", 
instance.getResources().getSizeOfPhysicalMemory());
-                                       gen.writeNumberField("freeMemory", 
instance.getResources().getSizeOfJvmHeap());
-                                       gen.writeNumberField("managedMemory", 
instance.getResources().getSizeOfManagedMemory());
-
-                                       // only send metrics when only one task 
manager requests them.
-                                       if 
(pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-                                               fetcher.update();
-                                               
MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-                                               if (metrics != null) {
-                                                       
gen.writeObjectFieldStart("metrics");
-                                                       long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-                                                       long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-                                                       long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-                                                       
gen.writeNumberField("heapCommitted", heapCommitted);
-                                                       
gen.writeNumberField("heapUsed", heapUsed);
-                                                       
gen.writeNumberField("heapMax", heapTotal);
-
-                                                       long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-                                                       long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-                                                       long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-                                                       
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-                                                       
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-                                                       
gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-                                                       
gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-                                                       
gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-                                                       
gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-                                                       long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-                                                       long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-                                                       long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-                                                       
gen.writeNumberField("directCount", directCount);
-                                                       
gen.writeNumberField("directUsed", directUsed);
-                                                       
gen.writeNumberField("directMax", directMax);
-
-                                                       long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-                                                       long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-                                                       long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-                                                       
gen.writeNumberField("mappedCount", mappedCount);
-                                                       
gen.writeNumberField("mappedUsed", mappedUsed);
-                                                       
gen.writeNumberField("mappedMax", mappedMax);
-
-                                                       long 
memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-                                                       long 
memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-                                                       
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-                                                       
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-                                                       
gen.writeArrayFieldStart("garbageCollectors");
-
-                                                       for (String gcName : 
metrics.garbageCollectorNames) {
-                                                               String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-                                                               String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-                                                               if (count != 
null  && time != null) {
-                                                                       
gen.writeStartObject();
-                                                                       
gen.writeStringField("name", gcName);
-                                                                       
gen.writeNumberField("count", Long.valueOf(count));
-                                                                       
gen.writeNumberField("time", Long.valueOf(time));
-                                                                       
gen.writeEndObject();
-                                                               }
-                                                       }
-
-                                                       gen.writeEndArray();
+       private String writeTaskManagersJson(Collection<Instance> instances, 
Map<String, String> pathParams) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+               gen.writeArrayFieldStart("taskmanagers");
+
+               for (Instance instance : instances) {
+                       gen.writeStartObject();
+                       gen.writeStringField("id", instance.getId().toString());
+                       gen.writeStringField("path", 
instance.getTaskManagerGateway().getAddress());
+                       gen.writeNumberField("dataPort", 
instance.getTaskManagerLocation().dataPort());
+                       gen.writeNumberField("timeSinceLastHeartbeat", 
instance.getLastHeartBeat());
+                       gen.writeNumberField("slotsNumber", 
instance.getTotalNumberOfSlots());
+                       gen.writeNumberField("freeSlots", 
instance.getNumberOfAvailableSlots());
+                       gen.writeNumberField("cpuCores", 
instance.getResources().getNumberOfCPUCores());
+                       gen.writeNumberField("physicalMemory", 
instance.getResources().getSizeOfPhysicalMemory());
+                       gen.writeNumberField("freeMemory", 
instance.getResources().getSizeOfJvmHeap());
+                       gen.writeNumberField("managedMemory", 
instance.getResources().getSizeOfManagedMemory());
+
+                       // only send metrics when only one task manager 
requests them.
+                       if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+                               fetcher.update();
+                               MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+                               if (metrics != null) {
+                                       gen.writeObjectFieldStart("metrics");
+                                       long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+                                       long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+                                       long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+                                       gen.writeNumberField("heapCommitted", 
heapCommitted);
+                                       gen.writeNumberField("heapUsed", 
heapUsed);
+                                       gen.writeNumberField("heapMax", 
heapTotal);
+
+                                       long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+                                       long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+                                       long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+                                       
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+                                       gen.writeNumberField("nonHeapUsed", 
nonHeapUsed);
+                                       gen.writeNumberField("nonHeapMax", 
nonHeapTotal);
+
+                                       gen.writeNumberField("totalCommitted", 
heapCommitted + nonHeapCommitted);
+                                       gen.writeNumberField("totalUsed", 
heapUsed + nonHeapUsed);
+                                       gen.writeNumberField("totalMax", 
heapTotal + nonHeapTotal);
+
+                                       long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+                                       long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+                                       long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+                                       gen.writeNumberField("directCount", 
directCount);
+                                       gen.writeNumberField("directUsed", 
directUsed);
+                                       gen.writeNumberField("directMax", 
directMax);
+
+                                       long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+                                       long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+                                       long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+                                       gen.writeNumberField("mappedCount", 
mappedCount);
+                                       gen.writeNumberField("mappedUsed", 
mappedUsed);
+                                       gen.writeNumberField("mappedMax", 
mappedMax);
+
+                                       long memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+                                       long memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+                                       
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+                                       
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+                                       
gen.writeArrayFieldStart("garbageCollectors");
+
+                                       for (String gcName : 
metrics.garbageCollectorNames) {
+                                               String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+                                               String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+                                               if (count != null  && time != 
null) {
+                                                       gen.writeStartObject();
+                                                       
gen.writeStringField("name", gcName);
+                                                       
gen.writeNumberField("count", Long.valueOf(count));
+                                                       
gen.writeNumberField("time", Long.valueOf(time));
                                                        gen.writeEndObject();
                                                }
                                        }
 
+                                       gen.writeEndArray();
                                        gen.writeEndObject();
                                }
-
-                               gen.writeEndArray();
-                               gen.writeEndObject();
-
-                               gen.close();
-                               return writer.toString();
-                       }
-                       else {
-                               throw new Exception("No connection to the 
leading JobManager.");
                        }
+
+                       gen.writeEndObject();
                }
-               catch (Exception e) {
-                       throw new RuntimeException("Failed to fetch list of all 
task managers: " + e.getMessage(), e);
-               }
+
+               gen.writeEndArray();
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index d4c9b2a..3affd7c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -34,6 +35,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler that returns a job's snapshotting settings.
@@ -42,8 +45,8 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
 
        private static final String CHECKPOINT_CONFIG_REST_PATH = 
"/jobs/:jobid/checkpoints/config";
 
-       public CheckpointConfigHandler(ExecutionGraphHolder 
executionGraphHolder) {
-               super(executionGraphHolder);
+       public CheckpointConfigHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -52,8 +55,16 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               return createCheckpointConfigJson(graph);
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createCheckpointConfigJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create checkpoint config json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 664744b..96cc3e0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -40,6 +41,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns checkpoint stats for a single job vertex.
@@ -50,8 +53,8 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
 
        private final CheckpointStatsCache cache;
 
-       public CheckpointStatsDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, CheckpointStatsCache cache) {
-               super(executionGraphHolder);
+       public CheckpointStatsDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+               super(executionGraphHolder, executor);
                this.cache = cache;
        }
 
@@ -61,30 +64,38 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               long checkpointId = parseCheckpointId(params);
-               if (checkpointId == -1) {
-                       return "{}";
-               }
-
-               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
-               if (snapshot == null) {
-                       return "{}";
-               }
-
-               AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
-
-               if (checkpoint != null) {
-                       cache.tryAdd(checkpoint);
-               } else {
-                       checkpoint = cache.tryGet(checkpointId);
-
-                       if (checkpoint == null) {
-                               return "{}";
-                       }
-               }
-
-               return createCheckpointDetailsJson(checkpoint);
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               long checkpointId = parseCheckpointId(params);
+                               if (checkpointId == -1) {
+                                       return "{}";
+                               }
+
+                               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+                               if (snapshot == null) {
+                                       return "{}";
+                               }
+
+                               AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
+
+                               if (checkpoint != null) {
+                                       cache.tryAdd(checkpoint);
+                               } else {
+                                       checkpoint = cache.tryGet(checkpointId);
+
+                                       if (checkpoint == null) {
+                                               return "{}";
+                                       }
+                               }
+
+                               try {
+                                       return 
createCheckpointDetailsJson(checkpoint);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create checkpoint details json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index d116c56..045248b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -43,6 +44,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,8 +60,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
 
        private final CheckpointStatsCache cache;
 
-       public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder 
executionGraphHolder, CheckpointStatsCache cache) {
-               super(executionGraphHolder);
+       public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+               super(executionGraphHolder, executor);
                this.cache = checkNotNull(cache);
        }
 
@@ -68,28 +71,28 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
        }
 
        @Override
-       public String handleJsonRequest(
-               Map<String, String> pathParams,
-               Map<String, String> queryParams,
-               JobManagerGateway jobManagerGateway) throws Exception {
+       public CompletableFuture<String> handleJsonRequest(
+                       Map<String, String> pathParams,
+                       Map<String, String> queryParams,
+                       JobManagerGateway jobManagerGateway) {
                return super.handleJsonRequest(pathParams, queryParams, 
jobManagerGateway);
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
                long checkpointId = 
CheckpointStatsDetailsHandler.parseCheckpointId(params);
                if (checkpointId == -1) {
-                       return "{}";
+                       return CompletableFuture.completedFuture("{}");
                }
 
                JobVertexID vertexId = 
AbstractJobVertexRequestHandler.parseJobVertexId(params);
                if (vertexId == null) {
-                       return "{}";
+                       return CompletableFuture.completedFuture("{}");
                }
 
                CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
                if (snapshot == null) {
-                       return "{}";
+                       return CompletableFuture.completedFuture("{}");
                }
 
                AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
@@ -100,16 +103,20 @@ public class CheckpointStatsDetailsSubtasksHandler 
extends AbstractExecutionGrap
                        checkpoint = cache.tryGet(checkpointId);
 
                        if (checkpoint == null) {
-                               return "{}";
+                               return CompletableFuture.completedFuture("{}");
                        }
                }
 
                TaskStateStats taskStats = 
checkpoint.getTaskStateStats(vertexId);
                if (taskStats == null) {
-                       return "{}";
+                       return CompletableFuture.completedFuture("{}");
                }
 
-               return createSubtaskCheckpointDetailsJson(checkpoint, 
taskStats);
+               try {
+                       return 
CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint,
 taskStats));
+               } catch (IOException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index a86c5fd..a60aee0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -43,6 +44,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler that returns checkpoint statistics for a job.
@@ -51,8 +54,8 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
 
        private static final String CHECKPOINT_STATS_REST_PATH = 
"/jobs/:jobid/checkpoints";
 
-       public CheckpointStatsHandler(ExecutionGraphHolder 
executionGraphHolder) {
-               super(executionGraphHolder);
+       public CheckpointStatsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
        }
 
        @Override
@@ -61,8 +64,16 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
        }
 
        @Override
-       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               return createCheckpointStatsJson(graph);
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createCheckpointStatsJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create checkpoint stats json.", e);
+                               }
+                       },
+                       executor);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index b95f2c4..cf286ce 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
@@ -28,6 +29,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Abstract request handler that returns a list of all available metrics or 
the values for a set of metrics.
@@ -43,17 +46,27 @@ import java.util.Map;
 public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler {
        private final MetricFetcher fetcher;
 
-       public AbstractMetricsHandler(MetricFetcher fetcher) {
+       public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) 
{
+               super(executor);
                this.fetcher = Preconditions.checkNotNull(fetcher);
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
-               fetcher.update();
-               String requestedMetricsList = queryParams.get("get");
-               return requestedMetricsList != null
-                       ? getMetricsValues(pathParams, requestedMetricsList)
-                       : getAvailableMetricsList(pathParams);
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               fetcher.update();
+                               String requestedMetricsList = 
queryParams.get("get");
+                               try {
+                                       return requestedMetricsList != null
+                                               ? getMetricsValues(pathParams, 
requestedMetricsList)
+                                               : 
getAvailableMetricsList(pathParams);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not retrieve metrics.", e);
+                               }
+                       },
+                       executor);
+
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7252d8a..2bd6683 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for the job manager a list of all available 
metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobManagerMetricsHandler extends 
AbstractMetricsHandler {
 
        private static final String JOBMANAGER_METRICS_REST_PATH = 
"/jobmanager/metrics";
 
-       public JobManagerMetricsHandler(MetricFetcher fetcher) {
-               super(fetcher);
+       public JobManagerMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
+               super(executor, fetcher);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index a193457..e5e2500 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobMetricsHandler extends AbstractMetricsHandler 
{
        public static final String PARAMETER_JOB_ID = "jobid";
        private static final String JOB_METRICS_REST_PATH = 
"/jobs/:jobid/metrics";
 
-       public JobMetricsHandler(MetricFetcher fetcher) {
-               super(fetcher);
+       public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+               super(executor, fetcher);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index e893da4..1d2cd84 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given task a list of all available 
metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobVertexMetricsHandler extends 
AbstractMetricsHandler {
        public static final String PARAMETER_VERTEX_ID = "vertexid";
        private static final String JOB_VERTEX_METRICS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/metrics";
 
-       public JobVertexMetricsHandler(MetricFetcher fetcher) {
-               super(fetcher);
+       public JobVertexMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
+               super(executor, fetcher);
        }
 
        @Override

Reply via email to