http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
index f419908..d992b85 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtaskCurrentAttemptDetailsHandler.
  */
 public class SubtaskCurrentAttemptDetailsHandlerTest {
        @Test
        public void testGetPaths() {
-               SubtaskCurrentAttemptDetailsHandler handler = new 
SubtaskCurrentAttemptDetailsHandler(null, null);
+               SubtaskCurrentAttemptDetailsHandler handler = new 
SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", 
paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 74a19a9..ce8e72f 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtaskExecutionAttemptAccumulatorsHandler.
  */
@@ -61,7 +64,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               SubtaskExecutionAttemptAccumulatorsHandler handler = new 
SubtaskExecutionAttemptAccumulatorsHandler(null);
+               SubtaskExecutionAttemptAccumulatorsHandler handler = new 
SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
 paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index a9161b3..e1fbf92 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -34,6 +35,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtaskExecutionAttemptDetailsHandler.
  */
@@ -70,7 +73,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               SubtaskExecutionAttemptDetailsHandler handler = new 
SubtaskExecutionAttemptDetailsHandler(null, null);
+               SubtaskExecutionAttemptDetailsHandler handler = new 
SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt",
 paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 6022be2..f33da80 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtasksAllAccumulatorsHandler.
  */
@@ -55,7 +58,7 @@ public class SubtasksAllAccumulatorsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               SubtasksAllAccumulatorsHandler handler = new 
SubtasksAllAccumulatorsHandler(null);
+               SubtasksAllAccumulatorsHandler handler = new 
SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", 
paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 22a2d27..548efaf 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -34,6 +35,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the SubtasksTimesHandler.
  */
@@ -56,7 +59,7 @@ public class SubtasksTimesHandlerTest {
 
        @Test
        public void testGetPaths() {
-               SubtasksTimesHandler handler = new SubtasksTimesHandler(null);
+               SubtasksTimesHandler handler = new 
SubtasksTimesHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 5846d75..cf59f05 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -21,17 +21,16 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -46,19 +45,12 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future$;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.isA;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -72,9 +64,9 @@ public class TaskManagerLogHandlerTest {
        public void testGetPaths() {
                TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
                        mock(JobManagerRetriever.class),
-                       mock(ExecutionContextExecutor.class),
-                       Future$.MODULE$.successful("/jm/address"),
-                       AkkaUtils.getDefaultClientTimeout(),
+                       Executors.directExecutor(),
+                       CompletableFuture.completedFuture("/jm/address"),
+                       TestingUtils.TIMEOUT(),
                        TaskManagerLogHandler.FileMode.LOG,
                        new Configuration(),
                        false,
@@ -85,9 +77,9 @@ public class TaskManagerLogHandlerTest {
 
                TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
                        mock(JobManagerRetriever.class),
-                       mock(ExecutionContextExecutor.class),
-                       Future$.MODULE$.successful("/jm/address"),
-                       AkkaUtils.getDefaultClientTimeout(),
+                       Executors.directExecutor(),
+                       CompletableFuture.completedFuture("/jm/address"),
+                       TestingUtils.TIMEOUT(),
                        TaskManagerLogHandler.FileMode.STDOUT,
                        new Configuration(),
                        false,
@@ -115,27 +107,21 @@ public class TaskManagerLogHandlerTest {
 
                // ========= setup JobManager 
==================================================================================
 
-               ActorGateway jobManagerGateway = mock(ActorGateway.class);
-               Object registeredTaskManagersAnswer = new 
JobManagerMessages.RegisteredTaskManagers(
-                       
JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
-
-               
when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class),
 any(FiniteDuration.class)))
-                       
.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
-               
when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()),
 any(FiniteDuration.class)))
-                       .thenReturn(Future$.MODULE$.successful((Object) 5));
-               
when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class),
 any(FiniteDuration.class)))
-                       .thenReturn(Future$.MODULE$.successful((Object) new 
JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
-               when(jobManagerGateway.path()).thenReturn("/jm/address");
+               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
+               
when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337));
+               when(jobManagerGateway.getHostname()).thenReturn("localhost");
+               
when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), 
any(Time.class))).thenReturn(
+                       
CompletableFuture.completedFuture(Optional.of(taskManager)));
 
                JobManagerRetriever retriever = mock(JobManagerRetriever.class);
-               when(retriever.getJobManagerGatewayAndWebPort())
-                       .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, 
Integer>(jobManagerGateway, 0)));
+               when(retriever.getJobManagerGatewayNow())
+                       .thenReturn(Optional.of(jobManagerGateway));
 
                TaskManagerLogHandler handler = new TaskManagerLogHandler(
                        retriever,
-                       
ExecutionContext$.MODULE$.fromExecutor(Executors.directExecutor()),
-                       Future$.MODULE$.successful("/jm/address"),
-                       AkkaUtils.getDefaultClientTimeout(),
+                       Executors.directExecutor(),
+                       CompletableFuture.completedFuture("/jm/address"),
+                       TestingUtils.TIMEOUT(),
                        TaskManagerLogHandler.FileMode.LOG,
                        new Configuration(),
                        false,

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
index 17e7e9d..2992d91 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+
 import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Tests for the TaskManagersHandler.
@@ -33,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
 public class TaskManagersHandlerTest {
        @Test
        public void testGetPaths() {
-               TaskManagersHandler handler = new TaskManagersHandler(new 
FiniteDuration(0, TimeUnit.SECONDS), null);
+               TaskManagersHandler handler = new 
TaskManagersHandler(Time.seconds(0L), null);
                String[] paths = handler.getPaths();
                Assert.assertEquals(2, paths.length);
                List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index b032061..90e032d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -42,7 +42,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
         */
        @Test
        public void testHandleRequest() throws Exception {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStoreTest.setupStore(fetcher.getMetricStore());
 
                JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
@@ -91,7 +95,11 @@ public class AbstractMetricsHandlerTest extends TestLogger {
         */
        @Test
        public void testInvalidListDoesNotFail() {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStoreTest.setupStore(fetcher.getMetricStore());
 
                JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
@@ -117,7 +125,11 @@ public class AbstractMetricsHandlerTest extends TestLogger 
{
         */
        @Test
        public void testInvalidGetDoesNotFail() {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStoreTest.setupStore(fetcher.getMetricStore());
 
                JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 97c2055..994fc5e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.powermock.api.mockito.PowerMockito.mock;
@@ -48,7 +48,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger 
{
 
        @Test
        public void getMapFor() {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
 
                JobManagerMetricsHandler handler = new 
JobManagerMetricsHandler(fetcher);
@@ -62,7 +66,11 @@ public class JobManagerMetricsHandlerTest extends TestLogger 
{
 
        @Test
        public void getMapForNull() {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = fetcher.getMetricStore();
 
                JobManagerMetricsHandler handler = new 
JobManagerMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index 53666eb..a35af22 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -49,7 +49,11 @@ public class JobMetricsHandlerTest extends TestLogger {
 
        @Test
        public void getMapFor() throws Exception {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
 
                JobMetricsHandler handler = new JobMetricsHandler(fetcher);
@@ -64,7 +68,11 @@ public class JobMetricsHandlerTest extends TestLogger {
 
        @Test
        public void getMapForNull() {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = fetcher.getMetricStore();
 
                JobMetricsHandler handler = new JobMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index 5f68c6f..e84b11d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
 import static 
org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
 import static org.junit.Assert.assertEquals;
@@ -50,7 +50,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 
        @Test
        public void getMapFor() throws Exception {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
 
                JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
@@ -68,7 +72,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 
        @Test
        public void getMapForNull() {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = fetcher.getMetricStore();
 
                JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 369e8aa..4c91997 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.metrics;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -26,49 +27,39 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executor;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future$;
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.metrics.dump.MetricQueryService.METRIC_QUERY_SERVICE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
  * Tests for the MetricFetcher.
@@ -78,6 +69,8 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 public class MetricFetcherTest extends TestLogger {
        @Test
        public void testUpdate() throws Exception {
+               final Time timeout = Time.seconds(10L);
+
                // ========= setup TaskManager 
=================================================================================
                JobID jobID = new JobID();
                InstanceID tmID = new InstanceID();
@@ -94,58 +87,40 @@ public class MetricFetcherTest extends TestLogger {
                JobDetails details = mock(JobDetails.class);
                when(details.getJobId()).thenReturn(jobID);
 
-               ActorGateway jobManagerGateway = mock(ActorGateway.class);
-               Object registeredTaskManagersAnswer = new 
JobManagerMessages.RegisteredTaskManagers(
-                       
JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
 
-               when(jobManagerGateway.ask(isA(RequestJobDetails.class), 
any(FiniteDuration.class)))
-                       .thenReturn(Future$.MODULE$.successful((Object) new 
MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
-               
when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class),
 any(FiniteDuration.class)))
-                       
.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
-               when(jobManagerGateway.path()).thenReturn("/jm/address");
+               when(jobManagerGateway.requestJobDetails(anyBoolean(), 
anyBoolean(), any(Time.class)))
+                       .thenReturn(CompletableFuture.completedFuture(new 
MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
+               
when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
+                       
.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
+               when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
+               
when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(0));
 
-               JobManagerRetriever retriever = mock(JobManagerRetriever.class);
-               when(retriever.getJobManagerGatewayAndWebPort())
-                       .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, 
Integer>(jobManagerGateway, 0)));
+               AkkaJobManagerRetriever retriever = 
mock(AkkaJobManagerRetriever.class);
+               when(retriever.getJobManagerGatewayNow())
+                       .thenReturn(Optional.of(jobManagerGateway));
 
                // ========= setup QueryServices 
================================================================================
-               Object requestMetricsAnswer = createRequestDumpAnswer(tmID, 
jobID);
-
-               final ActorRef jmQueryService = mock(ActorRef.class);
-               final ActorRef tmQueryService = mock(ActorRef.class);
-
-               ActorSystem actorSystem = mock(ActorSystem.class);
-               when(actorSystem.actorFor(eq("/jm/" + 
METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService);
-               when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME 
+ "_" + tmRID.getResourceIdString()))).thenReturn(tmQueryService);
-
-               MetricFetcher.BasicGateway jmQueryServiceGateway = 
mock(MetricFetcher.BasicGateway.class);
-               
when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()),
 any(FiniteDuration.class)))
-                       .thenReturn(Future$.MODULE$.successful((Object) new 
MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
-
-               MetricFetcher.BasicGateway tmQueryServiceGateway = 
mock(MetricFetcher.BasicGateway.class);
-               
when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()),
 any(FiniteDuration.class)))
-                       
.thenReturn(Future$.MODULE$.successful(requestMetricsAnswer));
-
-               whenNew(MetricFetcher.BasicGateway.class)
-                       .withArguments(eq(new Object() {
-                               @Override
-                               public boolean equals(Object o) {
-                                       return o == jmQueryService;
-                               }
-                       }))
-                       .thenReturn(jmQueryServiceGateway);
-               whenNew(MetricFetcher.BasicGateway.class)
-                       .withArguments(eq(new Object() {
-                               @Override
-                               public boolean equals(Object o) {
-                                       return o == tmQueryService;
-                               }
-                       }))
-                       .thenReturn(tmQueryServiceGateway);
+               MetricQueryServiceGateway jmQueryService = 
mock(MetricQueryServiceGateway.class);
+               MetricQueryServiceGateway tmQueryService = 
mock(MetricQueryServiceGateway.class);
+
+               MetricDumpSerialization.MetricSerializationResult 
requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID);
+
+               when(jmQueryService.queryMetrics(any(Time.class)))
+                       .thenReturn(CompletableFuture.completedFuture(new 
MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0)));
+               when(tmQueryService.queryMetrics(any(Time.class)))
+                       
.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
+
+               MetricQueryServiceRetriever queryServiceRetriever = 
mock(MetricQueryServiceRetriever.class);
+               when(queryServiceRetriever.retrieveService(eq("/jm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
+               when(queryServiceRetriever.retrieveService(eq("/tm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
 
                // ========= start MetricFetcher testing 
=======================================================================
-               ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor());
-               MetricFetcher fetcher = new MetricFetcher(actorSystem, 
retriever, context);
+               MetricFetcher fetcher = new MetricFetcher(
+                       retriever,
+                       queryServiceRetriever,
+                       Executors.directExecutor(),
+                       timeout);
 
                // verify that update fetches metrics and updates the store
                fetcher.update();
@@ -170,13 +145,7 @@ public class MetricFetcherTest extends TestLogger {
                }
        }
 
-       private static class CurrentThreadExecutor implements Executor {
-               public void execute(Runnable r) {
-                       r.run();
-               }
-       }
-
-       private static MetricDumpSerialization.MetricSerializationResult 
createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException {
+       private static MetricDumpSerialization.MetricSerializationResult 
createRequestDumpAnswer(InstanceID tmID, JobID jobID) {
                Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new 
HashMap<>();
                Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new 
HashMap<>();
                Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new 
HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index 4333f04..c20ea98 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import scala.concurrent.ExecutionContext;
-
 import static 
org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler.TASK_MANAGER_ID_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -49,7 +49,11 @@ public class TaskManagerMetricsHandlerTest extends 
TestLogger {
 
        @Test
        public void getMapFor() throws Exception {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
 
                TaskManagerMetricsHandler handler = new 
TaskManagerMetricsHandler(fetcher);
@@ -64,7 +68,11 @@ public class TaskManagerMetricsHandlerTest extends 
TestLogger {
 
        @Test
        public void getMapForNull() {
-               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricFetcher fetcher = new MetricFetcher(
+                       mock(JobManagerRetriever.class),
+                       mock(MetricQueryServiceRetriever.class),
+                       Executors.directExecutor(),
+                       TestingUtils.TIMEOUT());
                MetricStore store = fetcher.getMetricStore();
 
                TaskManagerMetricsHandler handler = new 
TaskManagerMetricsHandler(fetcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 6ee78dd..bbc5889 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -22,13 +22,23 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -37,7 +47,8 @@ import scala.Option;
 import scala.reflect.ClassTag$;
 
 /**
- * Implementation of the {@link JobManagerGateway} for the {@link 
ActorGateway}.
+ * Implementation of the {@link JobManagerGateway} for old JobManager code 
based
+ * on Akka actors and the {@link ActorGateway}.
  */
 public class AkkaJobManagerGateway implements JobManagerGateway {
 
@@ -48,7 +59,6 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
                this.jobManagerGateway = 
Preconditions.checkNotNull(jobManagerGateway);
 
                final Option<String> optHostname = 
jobManagerGateway.actor().path().address().host();
-
                hostname = optHostname.isDefined() ? optHostname.get() : 
"localhost";
        }
 
@@ -63,25 +73,6 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
        }
 
        @Override
-       public 
CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> 
requestClassloadingProps(JobID jobId, Time timeout) {
-               return FutureUtils
-                       .toJava(jobManagerGateway
-                               .ask(
-                                       new 
JobManagerMessages.RequestClassloadingProps(jobId),
-                                       FutureUtils.toFiniteDuration(timeout)))
-                       .thenApply(
-                               (Object response) -> {
-                                       if (response instanceof 
JobManagerMessages.ClassloadingProps) {
-                                               return 
Optional.of(((JobManagerMessages.ClassloadingProps) response));
-                                       } else if (response instanceof 
JobManagerMessages.JobNotFound) {
-                                               return Optional.empty();
-                                       } else {
-                                               throw new 
FlinkFutureException("Unknown response: " + response + '.');
-                                       }
-                               });
-       }
-
-       @Override
        public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
                return FutureUtils.toJava(
                        jobManagerGateway
@@ -90,6 +81,21 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
        }
 
        @Override
+       public CompletableFuture<Integer> requestWebPort(Time timeout) {
+               CompletableFuture<JobManagerMessages.ResponseWebMonitorPort> 
portResponseFuture = FutureUtils.toJava(
+                       jobManagerGateway
+                               
.ask(JobManagerMessages.getRequestWebMonitorPort(), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.ResponseWebMonitorPort.class)));
+
+               return portResponseFuture.thenApply(
+                       JobManagerMessages.ResponseWebMonitorPort::port);
+       }
+
+       
//--------------------------------------------------------------------------------
+       // Job control
+       
//--------------------------------------------------------------------------------
+
+       @Override
        public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, 
ListeningBehaviour listeningBehaviour, Time timeout) {
                return FutureUtils
                        .toJava(
@@ -119,4 +125,146 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
                                }
                        );
        }
+
+       @Override
+       public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, 
String savepointPath, Time timeout) {
+               CompletableFuture<JobManagerMessages.CancellationResponse> 
cancellationFuture = FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(new 
JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
+
+               return cancellationFuture.thenApply(
+                       (JobManagerMessages.CancellationResponse response) -> {
+                               if (response instanceof 
JobManagerMessages.CancellationSuccess) {
+                                       return 
((JobManagerMessages.CancellationSuccess) response).savepointPath();
+                               } else {
+                                       throw new FlinkFutureException("Cancel 
with savepoint failed.", ((JobManagerMessages.CancellationFailure) 
response).cause());
+                               }
+                       });
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time 
timeout) {
+               CompletableFuture<JobManagerMessages.CancellationResponse> 
responseFuture = FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(new JobManagerMessages.CancelJob(jobId), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
+
+               return responseFuture.thenApply(
+                       (JobManagerMessages.CancellationResponse response) -> {
+                               if (response instanceof 
JobManagerMessages.CancellationSuccess) {
+                                       return Acknowledge.get();
+                               } else {
+                                       throw new FlinkFutureException("Cancel 
job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) 
response).cause());
+                               }
+                       });
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time 
timeout) {
+               CompletableFuture<JobManagerMessages.StoppingResponse> 
responseFuture = FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(new JobManagerMessages.StopJob(jobId), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class)));
+
+               return responseFuture.thenApply(
+                       (JobManagerMessages.StoppingResponse response) -> {
+                               if (response instanceof 
JobManagerMessages.StoppingSuccess) {
+                                       return Acknowledge.get();
+                               } else {
+                                       throw new FlinkFutureException("Stop 
job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) 
response).cause());
+                               }
+                       });
+       }
+
+       
//--------------------------------------------------------------------------------
+       // JobManager information
+       
//--------------------------------------------------------------------------------
+
+       @Override
+       public CompletableFuture<Optional<Instance>> 
requestTaskManagerInstance(InstanceID instanceId, Time timeout) {
+               return FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(new 
JobManagerMessages.RequestTaskManagerInstance(instanceId), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
+                       .thenApply(
+                               (JobManagerMessages.TaskManagerInstance 
taskManagerResponse) -> {
+                                       if 
(taskManagerResponse.instance().isDefined()) {
+                                               return 
Optional.of(taskManagerResponse.instance().get());
+                                       } else {
+                                               return Optional.empty();
+                                       }
+                               });
+       }
+
+       @Override
+       public CompletableFuture<Collection<Instance>> 
requestTaskManagerInstances(Time timeout) {
+               CompletableFuture<JobManagerMessages.RegisteredTaskManagers> 
taskManagersFuture = FutureUtils.toJava(
+                       jobManagerGateway
+                               
.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class)));
+
+               return taskManagersFuture.thenApply(
+                       
JobManagerMessages.RegisteredTaskManagers::asJavaCollection);
+       }
+
+       @Override
+       public 
CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> 
requestClassloadingProps(JobID jobId, Time timeout) {
+               return FutureUtils
+                       .toJava(jobManagerGateway
+                               .ask(
+                                       new 
JobManagerMessages.RequestClassloadingProps(jobId),
+                                       FutureUtils.toFiniteDuration(timeout)))
+                       .thenApply(
+                               (Object response) -> {
+                                       if (response instanceof 
JobManagerMessages.ClassloadingProps) {
+                                               return 
Optional.of(((JobManagerMessages.ClassloadingProps) response));
+                                       } else if (response instanceof 
JobManagerMessages.JobNotFound) {
+                                               return Optional.empty();
+                                       } else {
+                                               throw new 
FlinkFutureException("Unknown response: " + response + '.');
+                                       }
+                               });
+       }
+
+       @Override
+       public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean 
includeRunning, boolean includeFinished, Time timeout) {
+               return FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(new RequestJobDetails(true, true), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
+       }
+
+       @Override
+       public CompletableFuture<Optional<AccessExecutionGraph>> 
requestJob(JobID jobId, Time timeout) {
+               CompletableFuture<JobManagerMessages.JobResponse> 
jobResponseFuture = FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(new JobManagerMessages.RequestJob(jobId), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class)));
+
+               return jobResponseFuture.thenApply(
+                       (JobManagerMessages.JobResponse jobResponse) -> {
+                               if (jobResponse instanceof 
JobManagerMessages.JobFound) {
+                                       return 
Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph());
+                               } else {
+                                       return Optional.empty();
+                               }
+                       });
+       }
+
+       @Override
+       public CompletableFuture<StatusOverview> requestStatusOverview(Time 
timeout) {
+               return FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(RequestStatusOverview.getInstance(), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(StatusOverview.class)));
+       }
+
+       @Override
+       public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time 
timeout) {
+               return FutureUtils.toJava(
+                       jobManagerGateway
+                               .ask(RequestJobsWithIDsOverview.getInstance(), 
FutureUtils.toFiniteDuration(timeout))
+                               
.mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 562e697..19f0e2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -421,7 +421,9 @@ public class JobClient {
 
                LOG.info("Checking and uploading JAR files");
 
-               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = retrieveBlobServerAddress(jobManagerGateway, timeout);
+               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = retrieveBlobServerAddress(
+                       jobManagerGateway,
+                       timeout);
 
                final InetSocketAddress blobServerAddress;
 
@@ -448,7 +450,7 @@ public class JobClient {
                                "JobManager did not respond within " + timeout, 
e);
                } catch (Throwable throwable) {
                        Throwable stripped = 
ExceptionUtils.stripExecutionException(throwable);
-
+                       
                        try {
                                ExceptionUtils.tryDeserializeAndThrow(stripped, 
classLoader);
                        } catch (JobExecutionException jee) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index f204393..d24a3d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.Address;
 import com.typesafe.config.Config;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.lang3.StringUtils;
+
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.NetUtils;
 
 import org.slf4j.Logger;
@@ -52,6 +54,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Tools for starting JobManager and TaskManager processes, including the
@@ -171,7 +174,11 @@ public class BootstrapTools {
         *
         * @param config The Flink config.
         * @param highAvailabilityServices Service factory for high 
availability services
-        * @param actorSystem The ActorSystem to start the web frontend in.
+        * @param jobManagerRetriever to retrieve the leading JobManagerGateway
+        * @param queryServiceRetriever to resolve a query service
+        * @param timeout for asynchronous operations
+        * @param executor to run asynchronous operations
+        * @param jobManagerAddress the address of the JobManager for which the 
WebMonitor is started
         * @param logger Logger for log output
         * @return WebMonitor instance.
         * @throws Exception
@@ -179,16 +186,13 @@ public class BootstrapTools {
        public static WebMonitor startWebMonitorIfConfigured(
                        Configuration config,
                        HighAvailabilityServices highAvailabilityServices,
-                       ActorSystem actorSystem,
-                       ActorRef jobManager,
+                       JobManagerRetriever jobManagerRetriever,
+                       MetricQueryServiceRetriever queryServiceRetriever,
+                       Time timeout,
+                       Executor executor,
+                       String jobManagerAddress,
                        Logger logger) throws Exception {
 
-
-               // this ensures correct values are present in the web frontend
-               final Address address = AkkaUtils.getAddress(actorSystem);
-               config.setString(JobManagerOptions.ADDRESS, 
address.host().get());
-               config.setInteger(JobManagerOptions.PORT, 
Integer.parseInt(address.port().get().toString()));
-
                if (config.getInteger(WebOptions.PORT, 0) >= 0) {
                        logger.info("Starting JobManager Web Frontend");
 
@@ -197,12 +201,14 @@ public class BootstrapTools {
                        WebMonitor monitor = 
WebMonitorUtils.startWebRuntimeMonitor(
                                config,
                                highAvailabilityServices,
-                               actorSystem);
+                               jobManagerRetriever,
+                               queryServiceRetriever,
+                               timeout,
+                               executor);
 
                        // start the web monitor
                        if (monitor != null) {
-                               String jobManagerAkkaURL = 
AkkaUtils.getAkkaURL(actorSystem, jobManager);
-                               monitor.start(jobManagerAkkaURL);
+                               monitor.start(jobManagerAddress);
                        }
                        return monitor;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 043c603..5c6439d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -305,6 +305,16 @@ public class FutureUtils {
                return new FiniteDuration(time.toMilliseconds(), 
TimeUnit.MILLISECONDS);
        }
 
+       /**
+        * Converts {@link FiniteDuration} into Flink time.
+        *
+        * @param finiteDuration to convert into Flink time
+        * @return Flink time with the length of the given finite duration
+        */
+       public static Time toTime(FiniteDuration finiteDuration) {
+               return Time.milliseconds(finiteDuration.toMillis());
+       }
+
        // 
------------------------------------------------------------------------
        //  Converting futures
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index cba7b06..a4d0d11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -21,11 +21,20 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
+import javax.annotation.Nullable;
+
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -38,21 +47,19 @@ import java.util.concurrent.CompletableFuture;
 public interface JobManagerGateway extends RpcGateway {
 
        /**
-        * Requests the class loading properties for the given JobID.
+        * Requests the BlobServer port.
         *
-        * @param jobId for which the class loading properties are requested
         * @param timeout for this operation
-        * @return Future containing the optional class loading properties if 
they could be retrieved from the JobManager.
+        * @return Future containing the BlobServer port
         */
-       CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> 
requestClassloadingProps(JobID jobId, Time timeout);
+       CompletableFuture<Integer> requestBlobServerPort(Time timeout);
 
        /**
-        * Requests the BlobServer port.
+        * Returns the port of the web runtime monitor serving requests for the 
JobManager endpoint.
         *
-        * @param timeout for this operation
-        * @return Future containing the BlobServer port
+        * @return Port of the WebRuntimeMonitor responsible for the JobManager 
endpoint
         */
-       CompletableFuture<Integer> requestBlobServerPort(Time timeout);
+       CompletableFuture<Integer> requestWebPort(Time timeout);
 
        /**
         * Submits a job to the JobManager.
@@ -63,4 +70,103 @@ public interface JobManagerGateway extends RpcGateway {
         * @return Future containing an Acknowledge message if the submission 
succeeded
         */
        CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, 
ListeningBehaviour listeningBehaviour, Time timeout);
+
+       /**
+        * Cancels the given job after taking a savepoint and returning its 
path.
+        *
+        * If the savepointPath is null, then the JobManager will use the 
default savepoint directory
+        * to store the savepoint in. After the savepoint has been taken and 
the job has been canceled
+        * successfully, the path of the savepoint is returned.
+        *
+        * @param jobId identifying the job to cancel
+        * @param savepointPath Optional path for the savepoint to be stored 
under; if null, then the default path is
+        *                      taken
+        * @param timeout for the asynchronous operation
+        * @return Future containing the savepoint path of the taken savepoint 
or an Exception if the operation failed
+        */
+       CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable 
String savepointPath, Time timeout);
+
+       /**
+        * Cancels the given job.
+        *
+        * @param jobId identifying the job to cancel
+        * @param timeout for the asynchronous operation
+        * @return Future containing Acknowledge or an Exception if the 
operation failed
+        */
+       CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);
+
+       /**
+        * Stops the given job.
+        *
+        * @param jobId identifying the job to cancel
+        * @param timeout for the asynchronous operation
+        * @return Future containing Acknowledge or an Exception if the 
operation failed
+        */
+       CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);
+
+       /**
+        * Requests the class loading properties for the given JobID.
+        *
+        * @param jobId for which the class loading properties are requested
+        * @param timeout for this operation
+        * @return Future containing the optional class loading properties if 
they could be retrieved from the JobManager.
+        */
+       CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> 
requestClassloadingProps(JobID jobId, Time timeout);
+
+       /**
+        * Requests the TaskManager instance registered under the given 
instanceId from the JobManager.
+        * If there is no Instance registered, then {@link Optional#empty()} is 
returned.
+        *
+        * @param instanceId for which to retrieve the Instance
+        * @param timeout for the asynchronous operation
+        * @return Future containing the TaskManager instance registered under 
instanceId, otherwise {@link Optional#empty()}
+        */
+       CompletableFuture<Optional<Instance>> 
requestTaskManagerInstance(InstanceID instanceId, Time timeout);
+
+       /**
+        * Requests all currently registered TaskManager instances from the 
JobManager.
+        *
+        * @param timeout for the asynchronous operation
+        * @return Future containing the collection of all currently registered 
TaskManager instances
+        */
+       CompletableFuture<Collection<Instance>> 
requestTaskManagerInstances(Time timeout);
+
+       /**
+        * Requests job details currently being executed by the JobManager.
+        *
+        * @param includeRunning true if running jobs shall be included, 
otherwise false
+        * @param includeFinished true if finished jobs shall be included, 
otherwise false
+        * @param timeout for the asynchronous operation
+        * @return Future containing the job details
+        */
+       CompletableFuture<MultipleJobsDetails> requestJobDetails(
+               boolean includeRunning,
+               boolean includeFinished,
+               Time timeout);
+
+       /**
+        * Requests the AccessExecutionGraph for the given jobId. If there is 
no such graph, then
+        * {@link Optional#empty()} is returned.
+        *
+        * @param jobId identifying the job whose AccessExecutionGraph is 
requested
+        * @param timeout for the asynchronous operation
+        * @return Future containing the AccessExecutionGraph for the given 
jobId, otherwise {@link Optional#empty()}
+        */
+       CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID 
jobId, Time timeout);
+
+       /**
+        * Requests the status overview from the JobManager.
+        *
+        * @param timeout for the asynchronous operation
+        * @return Future containing the status overview
+        */
+       CompletableFuture<StatusOverview> requestStatusOverview(Time timeout);
+
+       /**
+        * Requests the job overview from the JobManager.
+        *
+        * @param timeout for the asynchronous operation
+        * @return Future containing the job overview
+        */
+       CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time 
timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index e173522..1791fe1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 9ebb126..9493696 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.Path;
@@ -31,8 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
-import akka.actor.ActorSystem;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -47,6 +49,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Utilities for the web runtime monitor. This class contains for example 
methods to build
@@ -119,27 +122,39 @@ public final class WebMonitorUtils {
         *
         * @param config The configuration for the runtime monitor.
         * @param highAvailabilityServices HighAvailabilityServices used to 
start the WebRuntimeMonitor
-        * @param actorSystem ActorSystem used to connect to the JobManager
-        *
+        * @param jobManagerRetriever which retrieves the currently leading 
JobManager
+        * @param queryServiceRetriever which retrieves the query service
+        * @param timeout for asynchronous operations
+        * @param executor to run asynchronous operations
         */
        public static WebMonitor startWebRuntimeMonitor(
                        Configuration config,
                        HighAvailabilityServices highAvailabilityServices,
-                       ActorSystem actorSystem) {
+                       JobManagerRetriever jobManagerRetriever,
+                       MetricQueryServiceRetriever queryServiceRetriever,
+                       Time timeout,
+                       Executor executor) {
                // try to load and instantiate the class
                try {
                        String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
                        Class<? extends WebMonitor> clazz = 
Class.forName(classname).asSubclass(WebMonitor.class);
 
-                       Constructor<? extends WebMonitor> constructor = 
clazz.getConstructor(Configuration.class,
+                       Constructor<? extends WebMonitor> constructor = 
clazz.getConstructor(
+                               Configuration.class,
                                LeaderRetrievalService.class,
                                BlobView.class,
-                               ActorSystem.class);
+                               JobManagerRetriever.class,
+                               MetricQueryServiceRetriever.class,
+                               Time.class,
+                               Executor.class);
                        return constructor.newInstance(
                                config,
                                
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
                                highAvailabilityServices.createBlobStore(),
-                               actorSystem);
+                               jobManagerRetriever,
+                               queryServiceRetriever,
+                               timeout,
+                               executor);
                } catch (ClassNotFoundException e) {
                        LOG.error("Could not load web runtime monitor. " +
                                        "Probably reason: flink-runtime-web is 
not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
new file mode 100644
index 0000000..2eade48
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/JobManagerRetriever.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Retrieves and stores the JobManagerGateway for the current leading 
JobManager.
+ */
+public abstract class JobManagerRetriever implements LeaderRetrievalListener {
+
+       private final Logger log = LoggerFactory.getLogger(getClass());
+
+       // False if we have to create a new JobManagerGateway future when being 
notified
+       // about a new leader address
+       private final AtomicBoolean firstTimeUsage;
+
+       private volatile CompletableFuture<JobManagerGateway> 
jobManagerGatewayFuture;
+
+       public JobManagerRetriever() {
+               firstTimeUsage = new AtomicBoolean(true);
+               jobManagerGatewayFuture = new CompletableFuture<>();
+       }
+
+       /**
+        * Returns the currently known leading job manager gateway and its web 
monitor port.
+        */
+       public Optional<JobManagerGateway> getJobManagerGatewayNow() throws 
Exception {
+               if (jobManagerGatewayFuture != null) {
+                       CompletableFuture<JobManagerGateway> 
jobManagerGatewayFuture = this.jobManagerGatewayFuture;
+
+                       if (jobManagerGatewayFuture.isDone()) {
+                               return 
Optional.of(jobManagerGatewayFuture.get());
+                       } else {
+                               return Optional.empty();
+                       }
+               } else {
+                       return Optional.empty();
+               }
+       }
+
+       /**
+        * Returns the current JobManagerGateway future.
+        */
+       public CompletableFuture<JobManagerGateway> getJobManagerGateway() 
throws Exception {
+               return jobManagerGatewayFuture;
+       }
+
+       @Override
+       public void notifyLeaderAddress(final String leaderAddress, final UUID 
leaderSessionID) {
+               if (leaderAddress != null && !leaderAddress.equals("")) {
+                       try {
+                               final CompletableFuture<JobManagerGateway> 
newJobManagerGatewayFuture;
+
+                               if (firstTimeUsage.compareAndSet(true, false)) {
+                                       newJobManagerGatewayFuture = 
jobManagerGatewayFuture;
+                               } else {
+                                       newJobManagerGatewayFuture = new 
CompletableFuture<>();
+                                       jobManagerGatewayFuture = 
newJobManagerGatewayFuture;
+                               }
+
+                               log.info("New leader reachable under {}:{}.", 
leaderAddress, leaderSessionID);
+
+                               createJobManagerGateway(leaderAddress, 
leaderSessionID).whenComplete(
+                                       (JobManagerGateway jobManagerGateway, 
Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       
newJobManagerGatewayFuture.completeExceptionally(new FlinkException("Could not 
retrieve" +
+                                                               "the current 
job manager gateway.", throwable));
+                                               } else {
+                                                       
newJobManagerGatewayFuture.complete(jobManagerGateway);
+                                               }
+                                       }
+                               );
+                       }
+                       catch (Exception e) {
+                               handleError(e);
+                       }
+               }
+       }
+
+       @Override
+       public void handleError(Exception exception) {
+               log.error("Received error from LeaderRetrievalService.", 
exception);
+
+               jobManagerGatewayFuture.completeExceptionally(exception);
+       }
+
+       /**
+        * Create a JobManagerGateway for the given leader address and leader 
id.
+        *
+        * @param leaderAddress to connect against
+        * @param leaderId the endpoint currently uses
+        * @return Future containing the resolved JobManagerGateway
+        * @throws Exception if the JobManagerGateway creation failed
+        */
+       protected abstract CompletableFuture<JobManagerGateway> 
createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
new file mode 100644
index 0000000..c79bf5d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Gateway to communicate with a QueryService.
+ *
+ * <p>Currently there is only one implementation working with a Akka based
+ * MetricQueryService {@link AkkaQueryServiceGateway}.
+ */
+public interface MetricQueryServiceGateway {
+
+       CompletableFuture<MetricDumpSerialization.MetricSerializationResult> 
queryMetrics(Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
new file mode 100644
index 0000000..7bb9b44
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceRetriever.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Retriever for {@link MetricQueryServiceGateway}.
+ */
+public interface MetricQueryServiceRetriever {
+
+       /**
+        * Retrieves for the given query service path a {@link 
MetricQueryServiceGateway}.
+        *
+        * @param queryServicePath under which the QueryService can be reached
+        * @return Future containing the resolved QueryServiceGateway
+        */
+       CompletableFuture<MetricQueryServiceGateway> retrieveService(String 
queryServicePath);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
new file mode 100644
index 0000000..027b42a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetriever.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link JobManagerRetriever} implementation for Akka based JobManagers.
+ */
+public class AkkaJobManagerRetriever extends JobManagerRetriever {
+
+       private final ActorSystem actorSystem;
+       private final Time timeout;
+
+       public AkkaJobManagerRetriever(
+                       ActorSystem actorSystem,
+                       Time timeout) {
+
+               this.actorSystem = Preconditions.checkNotNull(actorSystem);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
+       @Override
+       protected CompletableFuture<JobManagerGateway> 
createJobManagerGateway(String leaderAddress, UUID leaderId) throws Exception {
+               return FutureUtils.toJava(
+                       AkkaUtils.getActorRefFuture(
+                               leaderAddress,
+                               actorSystem,
+                               FutureUtils.toFiniteDuration(timeout)))
+                       .thenApplyAsync(
+                               (ActorRef jobManagerRef) -> {
+                                       ActorGateway leaderGateway = new 
AkkaActorGateway(
+                                               jobManagerRef, leaderId);
+
+                                       return new 
AkkaJobManagerGateway(leaderGateway);
+                               },
+                               actorSystem.dispatcher());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
new file mode 100644
index 0000000..8985205
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+
+import java.util.concurrent.CompletableFuture;
+
+import scala.reflect.ClassTag$;
+
+/**
+ * {@link MetricQueryServiceGateway} implementation for Akka based {@link 
MetricQueryService}.
+ */
+public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {
+
+       private final ActorRef queryServiceActorRef;
+
+       public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) {
+               this.queryServiceActorRef = 
Preconditions.checkNotNull(queryServiceActorRef);
+       }
+
+       @Override
+       public 
CompletableFuture<MetricDumpSerialization.MetricSerializationResult> 
queryMetrics(Time timeout) {
+               return FutureUtils.toJava(
+                       Patterns.ask(queryServiceActorRef, 
MetricQueryService.getCreateDump(), timeout.toMilliseconds())
+                               
.mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
new file mode 100644
index 0000000..7de436a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceRetriever.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link MetricQueryServiceRetriever} implementation for Akka based {@link 
MetricQueryService}.
+ */
+public class AkkaQueryServiceRetriever implements MetricQueryServiceRetriever {
+       private final ActorSystem actorSystem;
+       private final Time lookupTimeout;
+
+       public AkkaQueryServiceRetriever(ActorSystem actorSystem, Time 
lookupTimeout) {
+               this.actorSystem = Preconditions.checkNotNull(actorSystem);
+               this.lookupTimeout = Preconditions.checkNotNull(lookupTimeout);
+       }
+
+       @Override
+       public CompletableFuture<MetricQueryServiceGateway> 
retrieveService(String queryServicePath) {
+               ActorSelection selection = 
actorSystem.actorSelection(queryServicePath);
+
+               return 
FutureUtils.toJava(selection.resolveOne(FutureUtils.toFiniteDuration(lookupTimeout))).thenApply(AkkaQueryServiceGateway::new);
+       }
+}

Reply via email to