[FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway

This PR decouples the WebRuntimeMonitor from the ActorGateway by introducing
the JobManagerGateway interface which can have multiple implementations. This
is a preliminary step for the integration of the existing WebRuntimeMonitor
with the Flip-6 JobMaster.

Add time unit for web.timeout

This closes #4492.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f790d3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f790d3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f790d3e

Branch: refs/heads/master
Commit: 9f790d3efe5e8267da05eb97bbe07ca8a0f859fe
Parents: 00d5b62
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Aug 2 18:43:00 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Aug 11 13:48:14 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   2 +
 .../apache/flink/configuration/WebOptions.java  |   7 +
 .../MesosApplicationMasterRunner.java           |  13 +-
 .../webmonitor/ExecutionGraphHolder.java        |  59 ++----
 .../runtime/webmonitor/JobManagerRetriever.java | 197 ------------------
 .../webmonitor/RuntimeMonitorHandler.java       |  16 +-
 .../webmonitor/RuntimeMonitorHandlerBase.java   |  36 ++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  74 ++++---
 .../files/StaticFileServerHandler.java          |  35 ++--
 .../AbstractExecutionGraphRequestHandler.java   |  22 +-
 .../handlers/AbstractJsonRequestHandler.java    |  10 +-
 .../handlers/ClusterOverviewHandler.java        |  23 +--
 .../handlers/CurrentJobIdsHandler.java          |  22 +-
 .../handlers/CurrentJobsOverviewHandler.java    |  24 +--
 .../handlers/DashboardConfigHandler.java        |   4 +-
 .../handlers/HandlerRedirectUtils.java          |  41 ++--
 .../handlers/JarAccessDeniedHandler.java        |   4 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |   4 +-
 .../webmonitor/handlers/JarListHandler.java     |   4 +-
 .../webmonitor/handlers/JarPlanHandler.java     |   4 +-
 .../webmonitor/handlers/JarRunHandler.java      |  20 +-
 .../webmonitor/handlers/JarUploadHandler.java   |   8 +-
 .../handlers/JobCancellationHandler.java        |  19 +-
 .../JobCancellationWithSavepointHandlers.java   | 124 ++++++------
 .../handlers/JobManagerConfigHandler.java       |   4 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  19 +-
 .../webmonitor/handlers/RequestHandler.java     |   9 +-
 .../handlers/TaskManagerLogHandler.java         |  66 +++---
 .../handlers/TaskManagersHandler.java           |  39 ++--
 .../CheckpointStatsDetailsSubtasksHandler.java  |   6 +-
 .../metrics/AbstractMetricsHandler.java         |   4 +-
 .../webmonitor/metrics/MetricFetcher.java       | 202 +++++++++----------
 .../runtime/webmonitor/WebFrontendITCase.java   |  27 +--
 .../webmonitor/WebRuntimeMonitorITCase.java     |  73 ++++---
 .../handlers/ClusterOverviewHandlerTest.java    |   8 +-
 .../handlers/CurrentJobIdsHandlerTest.java      |   8 +-
 .../CurrentJobsOverviewHandlerTest.java         |  10 +-
 .../handlers/HandlerRedirectUtilsTest.java      |  39 ++--
 .../webmonitor/handlers/JarRunHandlerTest.java  |   5 +-
 .../handlers/JobAccumulatorsHandlerTest.java    |   5 +-
 .../handlers/JobCancellationHandlerTest.java    |   4 +-
 ...obCancellationWithSavepointHandlersTest.java |  82 ++++----
 .../handlers/JobConfigHandlerTest.java          |   5 +-
 .../handlers/JobDetailsHandlerTest.java         |   5 +-
 .../handlers/JobExceptionsHandlerTest.java      |   5 +-
 .../webmonitor/handlers/JobPlanHandlerTest.java |   5 +-
 .../handlers/JobStoppingHandlerTest.java        |   7 +-
 .../JobVertexAccumulatorsHandlerTest.java       |   5 +-
 .../JobVertexBackPressureHandlerTest.java       |   2 +-
 .../handlers/JobVertexDetailsHandlerTest.java   |   5 +-
 .../JobVertexTaskManagersHandlerTest.java       |   5 +-
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   6 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   5 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   5 +-
 .../SubtasksAllAccumulatorsHandlerTest.java     |   5 +-
 .../handlers/SubtasksTimesHandlerTest.java      |   5 +-
 .../handlers/TaskManagerLogHandlerTest.java     |  54 ++---
 .../handlers/TaskManagersHandlerTest.java       |   7 +-
 .../metrics/AbstractMetricsHandlerTest.java     |  26 ++-
 .../metrics/JobManagerMetricsHandlerTest.java   |  20 +-
 .../metrics/JobMetricsHandlerTest.java          |  20 +-
 .../metrics/JobVertexMetricsHandlerTest.java    |  20 +-
 .../webmonitor/metrics/MetricFetcherTest.java   | 111 ++++------
 .../metrics/TaskManagerMetricsHandlerTest.java  |  20 +-
 .../runtime/akka/AkkaJobManagerGateway.java     | 190 +++++++++++++++--
 .../apache/flink/runtime/client/JobClient.java  |   6 +-
 .../clusterframework/BootstrapTools.java        |  34 ++--
 .../flink/runtime/concurrent/FutureUtils.java   |  10 +
 .../runtime/jobmaster/JobManagerGateway.java    | 122 ++++++++++-
 .../metrics/dump/MetricDumpSerialization.java   |   1 +
 .../runtime/webmonitor/WebMonitorUtils.java     |  29 ++-
 .../retriever/JobManagerRetriever.java          | 123 +++++++++++
 .../retriever/MetricQueryServiceGateway.java    |  36 ++++
 .../retriever/MetricQueryServiceRetriever.java  |  35 ++++
 .../retriever/impl/AkkaJobManagerRetriever.java |  69 +++++++
 .../retriever/impl/AkkaQueryServiceGateway.java |  53 +++++
 .../impl/AkkaQueryServiceRetriever.java         |  51 +++++
 .../retriever/impl/RpcJobManagerRetriever.java  |  46 +++++
 .../flink/runtime/jobmanager/JobManager.scala   |  10 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   2 +
 .../flink/yarn/YarnApplicationMasterRunner.java |  12 +-
 82 files changed, 1551 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index c8d5c92..4138b4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -389,6 +389,8 @@ These parameters allow for advanced tuning. The default 
values are sufficient wh
 
 - `web.access-control-allow-origin`: Enable custom access control parameter 
for allow origin header, default is `*`.
 
+- `web.timeout`: Timeout for asynchronous operation executed by the web 
frontend in milliseconds (DEFAULT: `10000`, 10 s)
+
 ### File Systems
 
 The parameters define the behavior of tasks that create result files.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index f499045..3733244 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -149,6 +149,13 @@ public class WebOptions {
                        .defaultValue(50)
                        
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
 
+       /**
+        * Timeout for asynchronous operations by the WebRuntimeMonitor in 
milliseconds.
+        */
+       public static final ConfigOption<Long> TIMEOUT = ConfigOptions
+               .key("web.timeout")
+               .defaultValue(10L * 1000L);
+
 
        private WebOptions() {
                throw new IllegalAccessError();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 260b7f3..7891386 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
@@ -52,6 +54,8 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -319,11 +323,16 @@ public class MesosApplicationMasterRunner {
                        // 2: the web monitor
                        LOG.debug("Starting Web Frontend");
 
+                       Time webMonitorTimeout = 
Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
                        webMonitor = BootstrapTools.startWebMonitorIfConfigured(
                                config,
                                highAvailabilityServices,
-                               actorSystem,
-                               jobManager,
+                               new AkkaJobManagerRetriever(actorSystem, 
webMonitorTimeout),
+                               new AkkaQueryServiceRetriever(actorSystem, 
webMonitorTimeout),
+                               webMonitorTimeout,
+                               futureExecutor,
+                               AkkaUtils.getAkkaURL(actorSystem, jobManager),
                                LOG);
                        if (webMonitor != null) {
                                final URL webMonitorURL = new URL("http", 
appMasterHostname, webMonitor.getServerPort(), "/");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 75b0475..739b375 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,20 +19,19 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.WeakHashMap;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,7 +47,7 @@ public class ExecutionGraphHolder {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionGraphHolder.class);
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
 
        private final WeakHashMap<JobID, AccessExecutionGraph> cache = new 
WeakHashMap<>();
 
@@ -56,50 +55,36 @@ public class ExecutionGraphHolder {
                this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
        }
 
-       public ExecutionGraphHolder(FiniteDuration timeout) {
+       public ExecutionGraphHolder(Time timeout) {
                this.timeout = checkNotNull(timeout);
        }
 
        /**
-        * Retrieves the execution graph with {@link JobID} jid or null if it 
cannot be found.
+        * Retrieves the execution graph with {@link JobID} jid wrapped in 
{@link Optional} or
+        * {@link Optional#empty()} if it cannot be found.
         *
         * @param jid jobID of the execution graph to be retrieved
-        * @return the retrieved execution graph or null if it is not 
retrievable
+        * @return Optional ExecutionGraph if it has been retrievable, empty if 
there has been no ExecutionGraph
+        * @throws Exception if the ExecutionGraph retrieval failed.
         */
-       public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway 
jobManager) {
+       public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, 
JobManagerGateway jobManagerGateway) throws Exception {
                AccessExecutionGraph cached = cache.get(jid);
                if (cached != null) {
                        if (cached.getState() == JobStatus.SUSPENDED) {
                                cache.remove(jid);
                        } else {
-                               return cached;
+                               return Optional.of(cached);
                        }
                }
 
-               try {
-                       if (jobManager != null) {
-                               Future<Object> future = jobManager.ask(new 
JobManagerMessages.RequestJob(jid), timeout);
-                               Object result = Await.result(future, timeout);
-
-                               if (result instanceof 
JobManagerMessages.JobNotFound) {
-                                       return null;
-                               }
-                               else if (result instanceof 
JobManagerMessages.JobFound) {
-                                       AccessExecutionGraph eg = 
((JobManagerMessages.JobFound) result).executionGraph();
-                                       cache.put(jid, eg);
-                                       return eg;
-                               }
-                               else {
-                                       throw new RuntimeException("Unknown 
response from JobManager / Archive: " + result);
-                               }
-                       }
-                       else {
-                               LOG.warn("No connection to the leading 
JobManager.");
-                               return null;
-                       }
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Error requesting execution 
graph", e);
-               }
+               CompletableFuture<Optional<AccessExecutionGraph>> 
executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
+
+               Optional<AccessExecutionGraph> result = 
executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+               return result.map((executionGraph) -> {
+                       cache.put(jid, executionGraph);
+
+                       return executionGraph;
+               });
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
deleted file mode 100644
index 175a4b8..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.ResponseWebMonitorPort;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Retrieves and stores the actor gateway to the current leading JobManager. 
In case of an error,
- * the {@link WebRuntimeMonitor} to which this instance is associated will be 
stopped.
- *
- * <p>The job manager gateway only works if the web monitor and the job 
manager run in the same
- * actor system, because many execution graph structures are not serializable. 
This breaks the nice
- * leader retrieval abstraction and we have a special code path in case that 
another job manager is
- * leader (see {@link 
org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils}. In such a
- * case, we get the address of the web monitor of the leading job manager and 
redirect to it
- * (instead of directly communicating with it).
- */
-public class JobManagerRetriever implements LeaderRetrievalListener {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerRetriever.class);
-
-       private final Object waitLock = new Object();
-
-       private final WebMonitor webMonitor;
-       private final ActorSystem actorSystem;
-       private final FiniteDuration lookupTimeout;
-       private final FiniteDuration timeout;
-
-       private volatile Future<Tuple2<ActorGateway, Integer>> 
leaderGatewayPortFuture;
-
-       public JobManagerRetriever(
-                       WebMonitor webMonitor,
-                       ActorSystem actorSystem,
-                       FiniteDuration lookupTimeout,
-                       FiniteDuration timeout) {
-
-               this.webMonitor = checkNotNull(webMonitor);
-               this.actorSystem = checkNotNull(actorSystem);
-               this.lookupTimeout = checkNotNull(lookupTimeout);
-               this.timeout = checkNotNull(timeout);
-       }
-
-       /**
-        * Returns the currently known leading job manager gateway and its web 
monitor port.
-        */
-       public Option<Tuple2<ActorGateway, Integer>> 
getJobManagerGatewayAndWebPort() throws Exception {
-               if (leaderGatewayPortFuture != null) {
-                       Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture 
= leaderGatewayPortFuture;
-
-                       if (gatewayPortFuture.isCompleted()) {
-                               Tuple2<ActorGateway, Integer> gatewayPort = 
Await.result(gatewayPortFuture, timeout);
-
-                               return Option.apply(gatewayPort);
-                       } else {
-                               return Option.empty();
-                       }
-               } else {
-                       return Option.empty();
-               }
-       }
-
-       /**
-        * Awaits the leading job manager gateway and its web monitor port.
-        */
-       public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() 
throws Exception {
-               Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
-               Deadline deadline = timeout.fromNow();
-
-               while (!deadline.isOverdue()) {
-                       synchronized (waitLock) {
-                               gatewayPortFuture = leaderGatewayPortFuture;
-
-                               if (gatewayPortFuture != null) {
-                                       break;
-                               }
-
-                               waitLock.wait(deadline.timeLeft().toMillis());
-                       }
-               }
-
-               if (gatewayPortFuture == null) {
-                       throw new TimeoutException("There is no JobManager 
available.");
-               } else {
-                       return Await.result(gatewayPortFuture, 
deadline.timeLeft());
-               }
-       }
-
-       @Override
-       public void notifyLeaderAddress(final String leaderAddress, final UUID 
leaderSessionID) {
-               if (leaderAddress != null && !leaderAddress.equals("")) {
-                       try {
-                               final Promise<Tuple2<ActorGateway, Integer>> 
leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-                               synchronized (waitLock) {
-                                       leaderGatewayPortFuture = 
leaderGatewayPortPromise.future();
-                                       waitLock.notifyAll();
-                               }
-
-                               LOG.info("New leader reachable under {}:{}.", 
leaderAddress, leaderSessionID);
-
-                               AkkaUtils.getActorRefFuture(leaderAddress, 
actorSystem, lookupTimeout)
-                                               // Resolve the actor ref
-                                               .flatMap(new Mapper<ActorRef, 
Future<Tuple2<ActorGateway, Object>>>() {
-                                                       @Override
-                                                       public 
Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
-                                                               ActorGateway 
leaderGateway = new AkkaActorGateway(
-                                                                               
jobManagerRef, leaderSessionID);
-
-                                                               Future<Object> 
webMonitorPort = leaderGateway.ask(
-                                                                       
JobManagerMessages.getRequestWebMonitorPort(),
-                                                                       
timeout);
-
-                                                               return 
Futures.successful(leaderGateway).zip(webMonitorPort);
-                                                       }
-                                               }, actorSystem.dispatcher())
-                                                               // Request the 
web monitor port
-                                               .onComplete(new 
OnComplete<Tuple2<ActorGateway, Object>>() {
-                                                       @Override
-                                                       public void 
onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws 
Throwable {
-                                                               if (failure == 
null) {
-                                                                       if 
(success._2() instanceof ResponseWebMonitorPort) {
-                                                                               
int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port();
-
-                                                                               
leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort));
-                                                                       } else {
-                                                                               
leaderGatewayPortPromise.failure(new Exception("Received the message " +
-                                                                               
success._2() + " as response to " + 
JobManagerMessages.getRequestWebMonitorPort() +
-                                                                               
        ". But a message of type " + ResponseWebMonitorPort.class + " was 
expected."));
-                                                                       }
-                                                               } else {
-                                                                       
LOG.warn("Failed to retrieve leader gateway and port.", failure);
-                                                                       
leaderGatewayPortPromise.failure(failure);
-                                                               }
-                                                       }
-                                               }, actorSystem.dispatcher());
-                       }
-                       catch (Exception e) {
-                               handleError(e);
-                       }
-               }
-       }
-
-       @Override
-       public void handleError(Exception exception) {
-               LOG.error("Received error from LeaderRetrievalService.", 
exception);
-
-               try {
-                       // stop associated webMonitor
-                       webMonitor.stop();
-               }
-               catch (Exception e) {
-                       LOG.error("Error while stopping the web server due to a 
LeaderRetrievalService error.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 4777202..35d13dd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -43,9 +45,7 @@ import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -72,8 +72,8 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
                        WebMonitorConfig cfg,
                        RequestHandler handler,
                        JobManagerRetriever retriever,
-                       Future<String> localJobManagerAddressFuture,
-                       FiniteDuration timeout,
+                       CompletableFuture<String> localJobManagerAddressFuture,
+                       Time timeout,
                        boolean httpsEnabled) {
 
                super(retriever, localJobManagerAddressFuture, timeout, 
httpsEnabled);
@@ -87,7 +87,7 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
        }
 
        @Override
-       protected void respondAsLeader(ChannelHandlerContext ctx, Routed 
routed, ActorGateway jobManager) {
+       protected void respondAsLeader(ChannelHandlerContext ctx, Routed 
routed, JobManagerGateway jobManagerGateway) {
                FullHttpResponse response;
 
                try {
@@ -106,7 +106,7 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
                        queryParams.put(WEB_MONITOR_ADDRESS_KEY,
                                (httpsEnabled ? "https://"; : "http://";) + 
address.getHostName() + ":" + address.getPort());
 
-                       response = handler.handleRequest(pathParams, 
queryParams, jobManager);
+                       response = handler.handleRequest(pathParams, 
queryParams, jobManagerGateway);
                }
                catch (NotFoundException e) {
                        // this should result in a 404 error code (not found)

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index d524632..4cb55f1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -29,11 +31,9 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,9 +48,9 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
 
        private final JobManagerRetriever retriever;
 
-       protected final Future<String> localJobManagerAddressFuture;
+       protected final CompletableFuture<String> localJobManagerAddressFuture;
 
-       protected final FiniteDuration timeout;
+       protected final Time timeout;
 
        /** Whether the web service has https enabled. */
        protected final boolean httpsEnabled;
@@ -59,8 +59,8 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
 
        public RuntimeMonitorHandlerBase(
                JobManagerRetriever retriever,
-               Future<String> localJobManagerAddressFuture,
-               FiniteDuration timeout,
+               CompletableFuture<String> localJobManagerAddressFuture,
+               Time timeout,
                boolean httpsEnabled) {
 
                this.retriever = checkNotNull(retriever);
@@ -78,17 +78,17 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
 
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
-               if (localJobManagerAddressFuture.isCompleted()) {
+               if (localJobManagerAddressFuture.isDone()) {
                        if (localJobManagerAddress == null) {
-                               localJobManagerAddress = 
Await.result(localJobManagerAddressFuture, timeout);
+                               localJobManagerAddress = 
localJobManagerAddressFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                        }
 
-                       Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
+                       Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getJobManagerGatewayNow();
 
-                       if (jobManager.isDefined()) {
-                               Tuple2<ActorGateway, Integer> gatewayPort = 
jobManager.get();
+                       if (optJobManagerGateway.isPresent()) {
+                               JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
                                String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
-                                       localJobManagerAddress, gatewayPort);
+                                       localJobManagerAddress, 
jobManagerGateway, timeout);
 
                                if (redirectAddress != null) {
                                        HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
@@ -96,7 +96,7 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
                                        KeepAliveWrite.flush(ctx, 
routed.request(), redirect);
                                }
                                else {
-                                       respondAsLeader(ctx, routed, 
gatewayPort._1());
+                                       respondAsLeader(ctx, routed, 
jobManagerGateway);
                                }
                        } else {
                                KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
@@ -106,5 +106,5 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
                }
        }
 
-       protected abstract void respondAsLeader(ChannelHandlerContext ctx, 
Routed routed, ActorGateway jobManager);
+       protected abstract void respondAsLeader(ChannelHandlerContext ctx, 
Routed routed, JobManagerGateway jobManagerGateway);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index e27a15f..17f02f0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -71,12 +70,14 @@ import 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,16 +86,11 @@ import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -108,7 +104,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class WebRuntimeMonitor implements WebMonitor {
 
        /** By default, all requests to the JobManager have a timeout of 10 
seconds. */
-       public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new 
FiniteDuration(10, TimeUnit.SECONDS);
+       public static final Time DEFAULT_REQUEST_TIMEOUT = Time.seconds(10L);
 
        /** Logger for web frontend startup / shutdown messages. */
        private static final Logger LOG = 
LoggerFactory.getLogger(WebRuntimeMonitor.class);
@@ -120,14 +116,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private final LeaderRetrievalService leaderRetrievalService;
 
-       /** LeaderRetrievalListener which stores the currently leading 
JobManager and its archive. */
+       /** Service which retrieves the currently leading JobManager and opens 
a JobManagerGateway. */
        private final JobManagerRetriever retriever;
 
        private final SSLContext serverSSLContext;
 
-       private final Promise<String> jobManagerAddressPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+       private final CompletableFuture<String> jobManagerAddressFuture = new 
CompletableFuture<>();
+
+       private final Time timeout;
 
-       private final FiniteDuration timeout;
        private final WebFrontendBootstrap netty;
 
        private final File webRootDir;
@@ -142,7 +139,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private AtomicBoolean cleanedUp = new AtomicBoolean();
 
-       private ExecutorService executorService;
 
        private MetricFetcher metricFetcher;
 
@@ -150,11 +146,15 @@ public class WebRuntimeMonitor implements WebMonitor {
                        Configuration config,
                        LeaderRetrievalService leaderRetrievalService,
                        BlobView blobView,
-                       ActorSystem actorSystem) throws IOException, 
InterruptedException {
+                       JobManagerRetriever jobManagerRetriever,
+                       MetricQueryServiceRetriever queryServiceRetriever,
+                       Time timeout,
+                       Executor executor) throws IOException, 
InterruptedException {
 
                this.leaderRetrievalService = 
checkNotNull(leaderRetrievalService);
-               this.timeout = AkkaUtils.getTimeout(config);
-               this.retriever = new JobManagerRetriever(this, actorSystem, 
AkkaUtils.getTimeout(config), timeout);
+               this.retriever = 
Preconditions.checkNotNull(jobManagerRetriever);
+               this.timeout = Preconditions.checkNotNull(timeout);
+
                this.cfg = new WebMonitorConfig(config);
 
                final String configuredAddress = cfg.getWebFrontendAddress();
@@ -191,7 +191,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                // - Back pressure stats 
----------------------------------------------
 
-               stackTraceSamples = new 
StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
+               stackTraceSamples = new StackTraceSampleCoordinator(executor, 
60000);
 
                // Back pressure stats tracker config
                int cleanUpInterval = 
config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
@@ -209,10 +209,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                // 
--------------------------------------------------------------------
 
-               executorService = new ForkJoinPool();
-
-               ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
-
                // Config to enable https access to the web-ui
                boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) 
&& SSLUtils.getSSLEnabled(config);
 
@@ -226,11 +222,11 @@ public class WebRuntimeMonitor implements WebMonitor {
                } else {
                        serverSSLContext = null;
                }
-               metricFetcher = new MetricFetcher(actorSystem, retriever, 
context);
+               metricFetcher = new MetricFetcher(retriever, 
queryServiceRetriever, executor, timeout);
 
                String defaultSavepointDir = 
config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
 
-               JobCancellationWithSavepointHandlers cancelWithSavepoint = new 
JobCancellationWithSavepointHandlers(currentGraphs, context, 
defaultSavepointDir);
+               JobCancellationWithSavepointHandlers cancelWithSavepoint = new 
JobCancellationWithSavepointHandlers(currentGraphs, executor, 
defaultSavepointDir);
                RuntimeMonitorHandler triggerHandler = 
handler(cancelWithSavepoint.getTriggerHandler());
                RuntimeMonitorHandler inProgressHandler = 
handler(cancelWithSavepoint.getInProgressHandler());
 
@@ -274,8 +270,8 @@ public class WebRuntimeMonitor implements WebMonitor {
                get(router,
                        new TaskManagerLogHandler(
                                retriever,
-                               context,
-                               jobManagerAddressPromise.future(),
+                               executor,
+                               jobManagerAddressFuture,
                                timeout,
                                TaskManagerLogHandler.FileMode.LOG,
                                config,
@@ -284,8 +280,8 @@ public class WebRuntimeMonitor implements WebMonitor {
                get(router,
                        new TaskManagerLogHandler(
                                retriever,
-                               context,
-                               jobManagerAddressPromise.future(),
+                               executor,
+                               jobManagerAddressFuture,
                                timeout,
                                TaskManagerLogHandler.FileMode.STDOUT,
                                config,
@@ -296,27 +292,27 @@ public class WebRuntimeMonitor implements WebMonitor {
                router
                        // log and stdout
                        .GET("/jobmanager/log", logFiles.logFile == null ? new 
ConstantTextHandler("(log file unavailable)") :
-                               new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.logFile,
+                               new StaticFileServerHandler(retriever, 
jobManagerAddressFuture, timeout, logFiles.logFile,
                                        enableSSL))
 
                        .GET("/jobmanager/stdout", logFiles.stdOutFile == null 
? new ConstantTextHandler("(stdout file unavailable)") :
-                               new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
+                               new StaticFileServerHandler(retriever, 
jobManagerAddressFuture, timeout, logFiles.stdOutFile,
                                        enableSSL));
 
                get(router, new JobManagerMetricsHandler(metricFetcher));
 
                // Cancel a job via GET (for proper integration with YARN this 
has to be performed via GET)
-               get(router, new JobCancellationHandler());
+               get(router, new JobCancellationHandler(timeout));
                // DELETE is the preferred way of canceling a job (Rest-conform)
-               delete(router, new JobCancellationHandler());
+               delete(router, new JobCancellationHandler(timeout));
 
                get(router, triggerHandler);
                get(router, inProgressHandler);
 
                // stop a job via GET (for proper integration with YARN this 
has to be performed via GET)
-               get(router, new JobStoppingHandler());
+               get(router, new JobStoppingHandler(timeout));
                // DELETE is the preferred way of stopping a job (Rest-conform)
-               delete(router, new JobStoppingHandler());
+               delete(router, new JobStoppingHandler(timeout));
 
                int maxCachedEntries = 
config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
                CheckpointStatsCache cache = new 
CheckpointStatsCache(maxCachedEntries);
@@ -351,7 +347,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                }
 
                // this handler serves all the static contents
-               router.GET("/:*", new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, webRootDir,
+               router.GET("/:*", new StaticFileServerHandler(retriever, 
jobManagerAddressFuture, timeout, webRootDir,
                        enableSSL));
 
                // add shutdown hook for deleting the directories and remaining 
temp files on shutdown
@@ -387,7 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor {
         * @return array of all JsonArchivists relevant for the history server
         */
        public static JsonArchivist[] getJsonArchivists() {
-               JsonArchivist[] archivists = new JsonArchivist[]{
+               JsonArchivist[] archivists = {
                        new 
CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
 
                        new JobPlanHandler.JobPlanJsonArchivist(),
@@ -418,7 +414,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                LOG.info("Starting with JobManager {} on port {}", 
jobManagerAkkaUrl, getServerPort());
 
                synchronized (startupShutdownLock) {
-                       jobManagerAddressPromise.success(jobManagerAkkaUrl);
+                       jobManagerAddressFuture.complete(jobManagerAkkaUrl);
                        leaderRetrievalService.start(retriever);
 
                        long delay = 
backPressureStatsTracker.getCleanUpInterval();
@@ -451,8 +447,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                        backPressureStatsTracker.shutDown();
 
-                       executorService.shutdownNow();
-
                        cleanup();
                }
        }
@@ -522,7 +516,7 @@ public class WebRuntimeMonitor implements WebMonitor {
        // 
------------------------------------------------------------------------
 
        private RuntimeMonitorHandler handler(RequestHandler handler) {
-               return new RuntimeMonitorHandler(cfg, handler, retriever, 
jobManagerAddressPromise.future(), timeout,
+               return new RuntimeMonitorHandler(cfg, handler, retriever, 
jobManagerAddressFuture, timeout,
                        serverSSLContext !=  null);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index be6928e..15acb00 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -26,9 +26,10 @@ package org.apache.flink.runtime.webmonitor.files;
  * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
@@ -70,13 +71,9 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.TimeZone;
-
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -118,9 +115,9 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
 
        private final JobManagerRetriever retriever;
 
-       private final Future<String> localJobManagerAddressFuture;
+       private final CompletableFuture<String> localJobManagerAddressFuture;
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
 
        /** The path in which the static documents are. */
        private final File rootPath;
@@ -135,8 +132,8 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
 
        public StaticFileServerHandler(
                        JobManagerRetriever retriever,
-                       Future<String> localJobManagerAddressPromise,
-                       FiniteDuration timeout,
+                       CompletableFuture<String> localJobManagerAddressPromise,
+                       Time timeout,
                        File rootPath,
                        boolean httpsEnabled) throws IOException {
 
@@ -145,8 +142,8 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
 
        public StaticFileServerHandler(
                        JobManagerRetriever retriever,
-                       Future<String> localJobManagerAddressFuture,
-                       FiniteDuration timeout,
+                       CompletableFuture<String> localJobManagerAddressFuture,
+                       Time timeout,
                        File rootPath,
                        boolean httpsEnabled,
                        Logger logger) throws IOException {
@@ -165,9 +162,9 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
 
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
-               if (localJobManagerAddressFuture.isCompleted()) {
+               if (localJobManagerAddressFuture.isDone()) {
                        if (localJobManagerAddress == null) {
-                               localJobManagerAddress = 
Await.result(localJobManagerAddressFuture, timeout);
+                               localJobManagerAddress = 
localJobManagerAddressFuture.get();
                        }
 
                        final HttpRequest request = routed.request();
@@ -183,12 +180,12 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
                                requestPath = "";
                        }
 
-                       Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
+                       Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getJobManagerGatewayNow();
 
-                       if (jobManager.isDefined()) {
+                       if (optJobManagerGateway.isPresent()) {
                                // Redirect to leader if necessary
                                String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
-                                       localJobManagerAddress, 
jobManager.get());
+                                       localJobManagerAddress, 
optJobManagerGateway.get(), timeout);
 
                                if (redirectAddress != null) {
                                        HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index d6c17af..89108db 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Base class for request handlers whose response depends on an ExecutionGraph
@@ -35,11 +38,11 @@ public abstract class AbstractExecutionGraphRequestHandler 
extends AbstractJsonR
        private final ExecutionGraphHolder executionGraphHolder;
 
        public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder 
executionGraphHolder) {
-               this.executionGraphHolder = executionGraphHolder;
+               this.executionGraphHolder = 
Preconditions.checkNotNull(executionGraphHolder);
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                String jidString = pathParams.get("jobid");
                if (jidString == null) {
                        throw new RuntimeException("JobId parameter missing");
@@ -53,12 +56,17 @@ public abstract class AbstractExecutionGraphRequestHandler 
extends AbstractJsonR
                        throw new RuntimeException("Invalid JobID string '" + 
jidString + "': " + e.getMessage());
                }
 
-               AccessExecutionGraph eg = 
executionGraphHolder.getExecutionGraph(jid, jobManager);
-               if (eg == null) {
-                       throw new NotFoundException("Could not find job with id 
" + jid);
+               final Optional<AccessExecutionGraph> optGraph;
+
+               try {
+                       optGraph = executionGraphHolder.getExecutionGraph(jid, 
jobManagerGateway);
+               } catch (Exception e) {
+                       throw new FlinkException("Could not retrieve 
ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
                }
 
-               return handleRequest(eg, pathParams);
+               final AccessExecutionGraph graph = optGraph.orElseThrow(() -> 
new NotFoundException("Could not find job with jobId " + jid + '.'));
+
+               return handleRequest(graph, pathParams);
        }
 
        public abstract String handleRequest(AccessExecutionGraph graph, 
Map<String, String> params) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 2b4a45f..266ffb0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -38,8 +38,8 @@ public abstract class AbstractJsonRequestHandler implements 
RequestHandler {
        private static final Charset ENCODING = Charset.forName("UTF-8");
 
        @Override
-       public FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
-               String result = handleJsonRequest(pathParams, queryParams, 
jobManager);
+       public FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
+               String result = handleJsonRequest(pathParams, queryParams, 
jobManagerGateway);
                byte[] bytes = result.getBytes(ENCODING);
 
                DefaultFullHttpResponse response = new DefaultFullHttpResponse(
@@ -57,7 +57,7 @@ public abstract class AbstractJsonRequestHandler implements 
RequestHandler {
         *
         * @param pathParams The map of REST path parameters, decoded by the 
router.
         * @param queryParams The map of query parameters.
-        * @param jobManager The JobManager actor.
+        * @param jobManagerGateway to communicate with the JobManager.
         *
         * @return The JSON string that is the HTTP response.
         *
@@ -69,6 +69,6 @@ public abstract class AbstractJsonRequestHandler implements 
RequestHandler {
        public abstract String handleJsonRequest(
                        Map<String, String> pathParams,
                        Map<String, String> queryParams,
-                       ActorGateway jobManager) throws Exception;
+                       JobManagerGateway jobManagerGateway) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 816ef24..4ebc4e7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -27,10 +27,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -46,9 +44,9 @@ public class ClusterOverviewHandler extends 
AbstractJsonRequestHandler {
 
        private static final String commitID = 
EnvironmentInformation.getRevisionInformation().commitId;
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
 
-       public ClusterOverviewHandler(FiniteDuration timeout) {
+       public ClusterOverviewHandler(Time timeout) {
                this.timeout = checkNotNull(timeout);
        }
 
@@ -58,12 +56,13 @@ public class ClusterOverviewHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                // we need no parameters, get all requests
                try {
-                       if (jobManager != null) {
-                               Future<Object> future = 
jobManager.ask(RequestStatusOverview.getInstance(), timeout);
-                               StatusOverview overview = (StatusOverview) 
Await.result(future, timeout);
+                       if (jobManagerGateway != null) {
+                               CompletableFuture<StatusOverview> 
overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+
+                               StatusOverview overview = 
overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
                                StringWriter writer = new StringWriter();
                                JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 9d0b863..778a300 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -19,18 +19,16 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Objects.requireNonNull;
 
@@ -43,9 +41,9 @@ public class CurrentJobIdsHandler extends 
AbstractJsonRequestHandler {
 
        private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
 
-       public CurrentJobIdsHandler(FiniteDuration timeout) {
+       public CurrentJobIdsHandler(Time timeout) {
                this.timeout = requireNonNull(timeout);
        }
 
@@ -55,12 +53,12 @@ public class CurrentJobIdsHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                // we need no parameters, get all requests
                try {
-                       if (jobManager != null) {
-                               Future<Object> future = 
jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
-                               JobsWithIDsOverview overview = 
(JobsWithIDsOverview) Await.result(future, timeout);
+                       if (jobManagerGateway != null) {
+                               CompletableFuture<JobsWithIDsOverview> 
overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+                               JobsWithIDsOverview overview = 
overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
                                StringWriter writer = new StringWriter();
                                JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d0518c8..b324426 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -35,10 +35,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -51,13 +49,13 @@ public class CurrentJobsOverviewHandler extends 
AbstractJsonRequestHandler {
        private static final String RUNNING_JOBS_REST_PATH = 
"/joboverview/running";
        private static final String COMPLETED_JOBS_REST_PATH = 
"/joboverview/completed";
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
 
        private final boolean includeRunningJobs;
        private final boolean includeFinishedJobs;
 
        public CurrentJobsOverviewHandler(
-                       FiniteDuration timeout,
+                       Time timeout,
                        boolean includeRunningJobs,
                        boolean includeFinishedJobs) {
 
@@ -79,13 +77,11 @@ public class CurrentJobsOverviewHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                try {
-                       if (jobManager != null) {
-                               Future<Object> future = jobManager.ask(
-                                               new 
RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
-
-                               MultipleJobsDetails result = 
(MultipleJobsDetails) Await.result(future, timeout);
+                       if (jobManagerGateway != null) {
+                               CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, 
includeFinishedJobs, timeout);
+                               MultipleJobsDetails result = 
jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
                                final long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 312c890..fe1d06b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -55,7 +55,7 @@ public class DashboardConfigHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                return this.configString;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 9fbafb8..e27d125 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -30,13 +31,8 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -49,35 +45,26 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class HandlerRedirectUtils {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(HandlerRedirectUtils.class);
-
-       /** Pattern to extract the host from an remote Akka URL. */
-       private static final Pattern LeaderAddressHostPattern = 
Pattern.compile("^.+@(.+):([0-9]+)/user/.+$");
-
        public static String getRedirectAddress(
                        String localJobManagerAddress,
-                       Tuple2<ActorGateway, Integer> leader) throws Exception {
+                       JobManagerGateway jobManagerGateway,
+                       Time timeout) throws Exception {
 
-               final String leaderAddress = leader._1().path();
-               final int webMonitorPort = leader._2();
+               final String leaderAddress = jobManagerGateway.getAddress();
 
                final String jobManagerName = 
localJobManagerAddress.substring(localJobManagerAddress.lastIndexOf("/") + 1);
 
                if (!localJobManagerAddress.equals(leaderAddress) &&
                        
!leaderAddress.equals(AkkaUtils.getLocalAkkaURL(jobManagerName))) {
                        // We are not the leader and need to redirect
-                       Matcher matcher = 
LeaderAddressHostPattern.matcher(leaderAddress);
-
-                       if (matcher.matches()) {
-                               String redirectAddress = String.format("%s:%d", 
matcher.group(1), webMonitorPort);
-                               return redirectAddress;
-                       }
-                       else {
-                               LOG.warn("Unexpected leader address pattern {}. 
Cannot extract host.", leaderAddress);
-                       }
-               }
+                       final String hostname = jobManagerGateway.getHostname();
 
-               return null;
+                       final CompletableFuture<Integer> webMonitorPortFuture = 
jobManagerGateway.requestWebPort(timeout);
+                       final int webMonitorPort = 
webMonitorPortFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+                       return String.format("%s:%d", hostname, webMonitorPort);
+               } else {
+                       return null;
+               }
        }
 
        public static HttpResponse getRedirectResponse(String redirectAddress, 
String path, boolean httpsEnabled) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 4a21fec..db55169 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.util.Map;
 
@@ -42,7 +42,7 @@ public class JarAccessDeniedHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                return ERROR_MESSAGE;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 2572a76..73771bd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -46,7 +46,7 @@ public class JarDeleteHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                final String file = pathParams.get("jarid");
                try {
                        File[] list = jarDir.listFiles(new FilenameFilter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4dd20b1..4f9b188 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -51,7 +51,7 @@ public class JarListHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                try {
                        StringWriter writer = new StringWriter();
                        JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 1b25e7f..b239160 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -45,7 +45,7 @@ public class JarPlanHandler extends JarActionHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                try {
                        JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);
                        JobGraph graph = getJobGraphAndClassLoader(config).f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 282fea8..12ffa4f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -34,8 +34,6 @@ import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
 
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * This handler handles requests to fetch plan for a jar.
  */
@@ -43,13 +41,13 @@ public class JarRunHandler extends JarActionHandler {
 
        static final String JAR_RUN_REST_PATH = "/jars/:jarid/run";
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
        private final Configuration clientConfig;
 
-       public JarRunHandler(File jarDirectory, FiniteDuration timeout, 
Configuration clientConfig) {
+       public JarRunHandler(File jarDirectory, Time timeout, Configuration 
clientConfig) {
                super(jarDirectory);
-               this.timeout = timeout;
-               this.clientConfig = clientConfig;
+               this.timeout = Preconditions.checkNotNull(timeout);
+               this.clientConfig = Preconditions.checkNotNull(clientConfig);
        }
 
        @Override
@@ -58,17 +56,17 @@ public class JarRunHandler extends JarActionHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                try {
                        JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);
                        Tuple2<JobGraph, ClassLoader> graph = 
getJobGraphAndClassLoader(config);
 
                        try {
                                JobClient.submitJobDetached(
-                                       new AkkaJobManagerGateway(jobManager),
+                                       jobManagerGateway,
                                        clientConfig,
                                        graph.f0,
-                                       Time.milliseconds(timeout.toMillis()),
+                                       timeout,
                                        graph.f1);
                        } catch (JobExecutionException e) {
                                throw new ProgramInvocationException("Failed to 
submit the job to the job manager", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 745a110..705c321 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.io.File;
 import java.util.Map;
@@ -44,9 +44,9 @@ public class JarUploadHandler extends 
AbstractJsonRequestHandler {
 
        @Override
        public String handleJsonRequest(
-                               Map<String, String> pathParams,
-                               Map<String, String> queryParams,
-                               ActorGateway jobManager) throws Exception {
+                       Map<String, String> pathParams,
+                       Map<String, String> queryParams,
+                       JobManagerGateway jobManagerGateway) throws Exception {
 
                String tempFilePath = queryParams.get("filepath");
                String filename = queryParams.get("filename");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index d9de7d7..513dc08 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
@@ -33,17 +34,23 @@ public class JobCancellationHandler extends 
AbstractJsonRequestHandler {
        private static final String JOB_CONCELLATION_REST_PATH = 
"/jobs/:jobid/cancel";
        private static final String JOB_CONCELLATION_YARN_REST_PATH = 
"/jobs/:jobid/yarn-cancel";
 
+       private final Time timeout;
+
+       public JobCancellationHandler(Time timeout) {
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
        @Override
        public String[] getPaths() {
                return new String[]{JOB_CONCELLATION_REST_PATH, 
JOB_CONCELLATION_YARN_REST_PATH};
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                try {
-                       JobID jobid = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-                       if (jobManager != null) {
-                               jobManager.tell(new 
JobManagerMessages.CancelJob(jobid));
+                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+                       if (jobManagerGateway != null) {
+                               jobManagerGateway.cancelJob(jobId, timeout);
                                return "{}";
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 7dd4a52..9b474aa 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -19,16 +19,17 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -37,7 +38,6 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
-import akka.dispatch.OnComplete;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import javax.annotation.Nullable;
@@ -48,10 +48,9 @@ import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -92,16 +91,16 @@ public class JobCancellationWithSavepointHandlers {
 
        public JobCancellationWithSavepointHandlers(
                        ExecutionGraphHolder currentGraphs,
-                       ExecutionContext executionContext) {
-               this(currentGraphs, executionContext, null);
+                       Executor executor) {
+               this(currentGraphs, executor, null);
        }
 
        public JobCancellationWithSavepointHandlers(
                        ExecutionGraphHolder currentGraphs,
-                       ExecutionContext executionContext,
+                       Executor executor,
                        @Nullable String defaultSavepointDirectory) {
 
-               this.triggerHandler = new TriggerHandler(currentGraphs, 
executionContext);
+               this.triggerHandler = new TriggerHandler(currentGraphs, 
executor);
                this.inProgressHandler = new InProgressHandler();
                this.defaultSavepointDirectory = defaultSavepointDirectory;
        }
@@ -127,11 +126,11 @@ public class JobCancellationWithSavepointHandlers {
                private final ExecutionGraphHolder currentGraphs;
 
                /** Execution context for futures. */
-               private final ExecutionContext executionContext;
+               private final Executor executor;
 
-               public TriggerHandler(ExecutionGraphHolder currentGraphs, 
ExecutionContext executionContext) {
+               public TriggerHandler(ExecutionGraphHolder currentGraphs, 
Executor executor) {
                        this.currentGraphs = checkNotNull(currentGraphs);
-                       this.executionContext = checkNotNull(executionContext);
+                       this.executor = checkNotNull(executor);
                }
 
                @Override
@@ -144,35 +143,40 @@ public class JobCancellationWithSavepointHandlers {
                public FullHttpResponse handleRequest(
                                Map<String, String> pathParams,
                                Map<String, String> queryParams,
-                               ActorGateway jobManager) throws Exception {
+                               JobManagerGateway jobManagerGateway) throws 
Exception {
 
                        try {
-                               if (jobManager != null) {
+                               if (jobManagerGateway != null) {
                                        JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
+                                       final Optional<AccessExecutionGraph> 
optGraph;
 
-                                       AccessExecutionGraph graph = 
currentGraphs.getExecutionGraph(jobId, jobManager);
-                                       if (graph == null) {
-                                               throw new Exception("Cannot 
find ExecutionGraph for job.");
-                                       } else {
-                                               CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
-                                               if (coord == null) {
-                                                       throw new 
Exception("Cannot find CheckpointCoordinator for job.");
-                                               }
+                                       try {
+                                               optGraph = 
currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
+                                       } catch (Exception e) {
+                                               throw new FlinkException("Could 
not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
+                                       }
 
-                                               String targetDirectory = 
pathParams.get("targetDirectory");
-                                               if (targetDirectory == null) {
-                                                       if 
(defaultSavepointDirectory == null) {
-                                                               throw new 
IllegalStateException("No savepoint directory configured. " +
-                                                                               
"You can either specify a directory when triggering this savepoint or " +
-                                                                               
"configure a cluster-wide default via key '" +
-                                                                               
CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-                                                       } else {
-                                                               targetDirectory 
= defaultSavepointDirectory;
-                                                       }
-                                               }
+                                       final AccessExecutionGraph graph = 
optGraph.orElseThrow(
+                                               () -> new 
NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
 
-                                               return 
handleNewRequest(jobManager, jobId, targetDirectory, 
coord.getCheckpointTimeout());
+                                       CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
+                                       if (coord == null) {
+                                               throw new Exception("Cannot 
find CheckpointCoordinator for job.");
                                        }
+
+                                       String targetDirectory = 
pathParams.get("targetDirectory");
+                                       if (targetDirectory == null) {
+                                               if (defaultSavepointDirectory 
== null) {
+                                                       throw new 
IllegalStateException("No savepoint directory configured. " +
+                                                                       "You 
can either specify a directory when triggering this savepoint or " +
+                                                                       
"configure a cluster-wide default via key '" +
+                                                                       
CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+                                               } else {
+                                                       targetDirectory = 
defaultSavepointDirectory;
+                                               }
+                                       }
+
+                                       return 
handleNewRequest(jobManagerGateway, jobId, targetDirectory, 
coord.getCheckpointTimeout());
                                } else {
                                        throw new Exception("No connection to 
the leading JobManager.");
                                }
@@ -182,7 +186,7 @@ public class JobCancellationWithSavepointHandlers {
                }
 
                @SuppressWarnings("unchecked")
-               private FullHttpResponse handleNewRequest(ActorGateway 
jobManager, final JobID jobId, String targetDirectory, long checkpointTimeout) 
throws IOException {
+               private FullHttpResponse handleNewRequest(JobManagerGateway 
jobManagerGateway, final JobID jobId, String targetDirectory, long 
checkpointTimeout) throws IOException {
                        // Check whether a request exists
                        final long requestId;
                        final boolean isNewRequest;
@@ -202,35 +206,21 @@ public class JobCancellationWithSavepointHandlers {
 
                                try {
                                        // Trigger cancellation
-                                       Object msg = new 
CancelJobWithSavepoint(jobId, targetDirectory);
-                                       Future<Object> cancelFuture = jobManager
-                                                       .ask(msg, 
FiniteDuration.apply(checkpointTimeout, "ms"));
-
-                                       cancelFuture.onComplete(new 
OnComplete<Object>() {
-                                               @Override
-                                               public void 
onComplete(Throwable failure, Object resp) throws Throwable {
-                                                       synchronized (lock) {
-                                                               try {
-                                                                       if 
(resp != null) {
-                                                                               
if (resp.getClass() == CancellationSuccess.class) {
-                                                                               
        String path = ((CancellationSuccess) resp).savepointPath();
-                                                                               
        completed.put(requestId, path);
-                                                                               
} else if (resp.getClass() == CancellationFailure.class) {
-                                                                               
        Throwable cause = ((CancellationFailure) resp).cause();
-                                                                               
        completed.put(requestId, cause);
-                                                                               
} else {
-                                                                               
        Throwable cause = new IllegalStateException("Unexpected 
CancellationResponse of type " + resp.getClass());
-                                                                               
        completed.put(requestId, cause);
-                                                                               
}
-                                                                       } else {
-                                                                               
completed.put(requestId, failure);
-                                                                       }
-                                                               } finally {
-                                                                       
inProgress.remove(jobId);
+                                       CompletableFuture<String> 
cancelJobFuture = jobManagerGateway
+                                               .cancelJobWithSavepoint(jobId, 
targetDirectory, Time.milliseconds(checkpointTimeout));
+
+                                       cancelJobFuture.whenCompleteAsync(
+                                               (String path, Throwable 
throwable) -> {
+                                                       try {
+                                                               if (throwable 
!= null) {
+                                                                       
completed.put(requestId, throwable);
+                                                               } else {
+                                                                       
completed.put(requestId, path);
                                                                }
+                                                       } finally {
+                                                               
inProgress.remove(jobId);
                                                        }
-                                               }
-                                       }, executionContext);
+                                               }, executor);
 
                                        success = true;
                                } finally {
@@ -298,9 +288,9 @@ public class JobCancellationWithSavepointHandlers {
 
                @Override
                @SuppressWarnings("unchecked")
-               public FullHttpResponse handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws 
Exception {
+               public FullHttpResponse handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) throws Exception {
                        try {
-                               if (jobManager != null) {
+                               if (jobManagerGateway != null) {
                                        JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
                                        long requestId = 
Long.parseLong(pathParams.get("requestId"));
 

Reply via email to