[FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#requestJob type safe

Let JobMasterGateway#requestJob and DispatcherGateway#requestJob return a
CompletableFuture<ArchivedExecutionGraph> instead of a
CompletableFuture<AccessExecutionGraph>. In order to support the old code
and the JobManagerGateway implementation we have to keep the return type
in RestfulGateway. Once the old code has been removed, we should change
this as well.

This closes #5309.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60b7b03f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60b7b03f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60b7b03f

Branch: refs/heads/master
Commit: 60b7b03f45aeb5a31202b014e486c40116124b30
Parents: dd14bdf
Author: Till Rohrmann <[email protected]>
Authored: Wed Jan 17 15:01:57 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 13:49:58 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |   4 +-
 .../runtime/dispatcher/DispatcherGateway.java   |  16 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  10 +-
 .../runtime/jobmaster/JobMasterGateway.java     |  29 ++-
 .../handler/legacy/ExecutionGraphCache.java     |   2 +-
 .../runtime/webmonitor/RestfulGateway.java      |   4 +-
 .../handler/legacy/ExecutionGraphCacheTest.java | 214 +++++++++++--------
 .../webmonitor/TestingRestfulGateway.java       | 204 ++++++++++++++++++
 8 files changed, 374 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 0271913..a5c8961 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -331,13 +331,13 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
-       public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, 
Time timeout) {
+       public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID 
jobId, Time timeout) {
                final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
                if (jobManagerRunner == null) {
                        return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
                } else {
-                       return 
jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout);
+                       return 
jobManagerRunner.getJobManagerGateway().requestJob(jobId, timeout);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 12cbbfb..9676429 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -79,4 +81,18 @@ public interface DispatcherGateway extends 
FencedRpcGateway<DispatcherId>, Restf
         * @return A future integer of the blob server port
         */
        CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
+
+       /**
+        * Requests the {@link ArchivedExecutionGraph} for the given jobId. If 
there is no such graph, then
+        * the future is completed with a {@link FlinkJobNotFoundException}.
+        *
+        * <p>Note: We enforce that the returned future contains a {@link 
ArchivedExecutionGraph} unlike
+        * the super interface.
+        *
+        * @param jobId identifying the job whose AccessExecutionGraph is 
requested
+        * @param timeout for the asynchronous operation
+        * @return Future containing the AccessExecutionGraph for the given 
jobId, otherwise {@link FlinkJobNotFoundException}
+        */
+       @Override
+       CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, 
@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index ef99d52..0c8ee16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -39,7 +39,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -784,11 +783,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        }
 
        @Override
-       public CompletableFuture<AccessExecutionGraph> 
requestArchivedExecutionGraph(Time timeout) {
-               return 
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph));
-       }
-
-       @Override
        public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
                return 
CompletableFuture.completedFuture(executionGraph.getState());
        }
@@ -803,9 +797,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        }
 
        @Override
-       public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, 
Time timeout) {
+       public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID 
jobId, Time timeout) {
                if (jobGraph.getJobID().equals(jobId)) {
-                       return requestArchivedExecutionGraph(timeout);
+                       return 
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph));
                } else {
                        return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 0d896ac..b811531 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -54,7 +55,7 @@ import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * {@link JobMaster} rpc gateway interface
+ * {@link JobMaster} rpc gateway interface.
  */
 public interface JobMasterGateway extends CheckpointCoordinatorGateway, 
FencedRpcGateway<JobMasterId>, RestfulGateway {
 
@@ -224,14 +225,14 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway, FencedRp
                        @RpcTimeout final Time timeout);
 
        /**
-        * Sends the heartbeat to job manager from task manager
+        * Sends the heartbeat to job manager from task manager.
         *
         * @param resourceID unique id of the task manager
         */
        void heartbeatFromTaskManager(final ResourceID resourceID);
 
        /**
-        * Sends heartbeat request from the resource manager
+        * Sends heartbeat request from the resource manager.
         *
         * @param resourceID unique id of the resource manager
         */
@@ -246,18 +247,24 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway, FencedRp
        CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time 
timeout);
 
        /**
-        * Request the {@link ArchivedExecutionGraph} of the currently executed 
job.
+        * Requests the current job status.
         *
         * @param timeout for the rpc call
-        * @return Future archived execution graph derived from the currently 
executed job
+        * @return Future containing the current job status
         */
-       CompletableFuture<AccessExecutionGraph> 
requestArchivedExecutionGraph(@RpcTimeout Time timeout);
+       CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout);
 
        /**
-        * Requests the current job status.
+        * Requests the {@link ArchivedExecutionGraph} for the given jobId. If 
there is no such graph, then
+        * the future is completed with a {@link FlinkJobNotFoundException}.
         *
-        * @param timeout for the rpc call
-        * @return Future containing the current job status
+        * <p>Note: We enforce that the returned future contains a {@link 
ArchivedExecutionGraph} unlike
+        * the super interface.
+        *
+        * @param jobId identifying the job whose AccessExecutionGraph is 
requested
+        * @param timeout for the asynchronous operation
+        * @return Future containing the AccessExecutionGraph for the given 
jobId, otherwise {@link FlinkJobNotFoundException}
         */
-       CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout);
+       @Override
+       CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, 
@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index f63b042..19186c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -122,7 +122,7 @@ public class ExecutionGraphCache implements Closeable {
                        }
 
                        if (successfulUpdate) {
-                               final CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = restfulGateway.requestJob(jobId, timeout);
+                               final CompletableFuture<? extends 
AccessExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, 
timeout);
 
                                executionGraphFuture.whenComplete(
                                        (AccessExecutionGraph executionGraph, 
Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 4160434..4da3947 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -54,14 +54,14 @@ public interface RestfulGateway extends RpcGateway {
        CompletableFuture<String> requestRestAddress(@RpcTimeout  Time timeout);
 
        /**
-        * Requests the AccessExecutionGraph for the given jobId. If there is 
no such graph, then
+        * Requests the {@link AccessExecutionGraph} for the given jobId. If 
there is no such graph, then
         * the future is completed with a {@link FlinkJobNotFoundException}.
         *
         * @param jobId identifying the job whose AccessExecutionGraph is 
requested
         * @param timeout for the asynchronous operation
         * @return Future containing the AccessExecutionGraph for the given 
jobId, otherwise {@link FlinkJobNotFoundException}
         */
-       CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, 
@RpcTimeout Time timeout);
+       CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID 
jobId, @RpcTimeout Time timeout);
 
        /**
         * Requests job details currently being executed on the Flink cluster.

http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index ecadaa5..8bdaff5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -30,38 +30,49 @@ import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link ExecutionGraphCache}.
  */
 public class ExecutionGraphCacheTest extends TestLogger {
 
+       private static ArchivedExecutionGraph expectedExecutionGraph;
+       private static final JobID expectedJobId = new JobID();
+
+       @BeforeClass
+       public static void setup() {
+               expectedExecutionGraph = new 
ArchivedExecutionGraphBuilder().build();
+       }
+
        /**
         * Tests that we can cache AccessExecutionGraphs over multiple accesses.
         */
@@ -69,23 +80,19 @@ public class ExecutionGraphCacheTest extends TestLogger {
        public void testExecutionGraphCaching() throws Exception {
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.hours(1L);
-               final JobID jobId = new JobID();
-               final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-               final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+               final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(expectedJobId, 
CompletableFuture.completedFuture(expectedExecutionGraph));
 
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-                       CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture.get());
+                       assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture.get());
 
-                       CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       accessExecutionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
accessExecutionGraphFuture2.get());
+                       assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture.get());
 
-                       // verify that we only issued a single request to the 
gateway
-                       verify(jobManagerGateway, 
times(1)).requestJob(eq(jobId), any(Time.class));
+                       assertThat(restfulGateway.getNumRequestJobCalls(), 
Matchers.equalTo(1));
                }
        }
 
@@ -96,25 +103,25 @@ public class ExecutionGraphCacheTest extends TestLogger {
        public void testExecutionGraphEntryInvalidation() throws Exception {
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.milliseconds(1L);
-               final JobID jobId = new JobID();
-               final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-               final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+               final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(
+                       expectedJobId,
+                       
CompletableFuture.completedFuture(expectedExecutionGraph),
+                       
CompletableFuture.completedFuture(expectedExecutionGraph));
 
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
executionGraphFuture.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
 
                        // sleep for the TTL
-                       Thread.sleep(timeToLive.toMilliseconds());
+                       Thread.sleep(timeToLive.toMilliseconds() * 5L);
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
executionGraphFuture2.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture2.get());
 
-                       verify(jobManagerGateway, 
times(2)).requestJob(eq(jobId), any(Time.class));
+                       assertThat(restfulGateway.getNumRequestJobCalls(), 
Matchers.equalTo(2));
                }
        }
 
@@ -127,18 +134,15 @@ public class ExecutionGraphCacheTest extends TestLogger {
        public void testImmediateCacheInvalidationAfterFailure() throws 
Exception {
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.hours(1L);
-               final JobID jobId = new JobID();
 
-               final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
-
-               final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
                // let's first answer with a JobNotFoundException and then only 
with the correct result
-               when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(
-                       FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId)),
-                       
CompletableFuture.completedFuture(accessExecutionGraph));
+               final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(
+                       expectedJobId,
+                       FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(expectedJobId)),
+                       
CompletableFuture.completedFuture(expectedExecutionGraph));
 
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
                        try {
                                executionGraphFuture.get();
@@ -148,9 +152,9 @@ public class ExecutionGraphCacheTest extends TestLogger {
                                assertTrue(ee.getCause() instanceof 
FlinkException);
                        }
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
executionGraphFuture2.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture2.get());
                }
        }
 
@@ -162,27 +166,36 @@ public class ExecutionGraphCacheTest extends TestLogger {
        public void testCacheEntryCleanup() throws Exception {
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.milliseconds(1L);
-               final JobID jobId1 = new JobID();
-               final JobID jobId2 = new JobID();
-               final AccessExecutionGraph accessExecutionGraph1 = 
mock(AccessExecutionGraph.class);
-               final AccessExecutionGraph accessExecutionGraph2 = 
mock(AccessExecutionGraph.class);
-
-               final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               when(jobManagerGateway.requestJob(eq(jobId1), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph1));
-               when(jobManagerGateway.requestJob(eq(jobId2), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph2));
+               final JobID expectedJobId2 = new JobID();
+               final ArchivedExecutionGraph expectedExecutionGraph2 = new 
ArchivedExecutionGraphBuilder().build();
+
+               final AtomicInteger requestJobCalls = new AtomicInteger(0);
+               final TestingRestfulGateway restfulGateway = 
TestingRestfulGateway.newBuilder()
+                       .setRequestJobFunction(
+                               jobId -> {
+                                       requestJobCalls.incrementAndGet();
+                                       if (jobId.equals(expectedJobId)) {
+                                               return 
CompletableFuture.completedFuture(expectedExecutionGraph);
+                                       } else if 
(jobId.equals(expectedJobId2)) {
+                                               return 
CompletableFuture.completedFuture(expectedExecutionGraph2);
+                                       } else {
+                                               throw new 
AssertionError("Invalid job id received.");
+                                       }
+                               }
+                       )
+                       .build();
 
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraph1Future = executionGraphCache.getExecutionGraph(jobId1, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraph1Future = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraph2Future = executionGraphCache.getExecutionGraph(jobId2, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraph2Future = executionGraphCache.getExecutionGraph(expectedJobId2, 
restfulGateway);
 
-                       assertEquals(accessExecutionGraph1, 
executionGraph1Future.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraph1Future.get());
 
-                       assertEquals(accessExecutionGraph2, 
executionGraph2Future.get());
+                       assertEquals(expectedExecutionGraph2, 
executionGraph2Future.get());
 
-                       verify(jobManagerGateway, 
times(1)).requestJob(eq(jobId1), any(Time.class));
-                       verify(jobManagerGateway, 
times(1)).requestJob(eq(jobId2), any(Time.class));
+                       assertThat(requestJobCalls.get(), Matchers.equalTo(2));
 
                        Thread.sleep(timeToLive.toMilliseconds());
 
@@ -199,12 +212,8 @@ public class ExecutionGraphCacheTest extends TestLogger {
        public void testConcurrentAccess() throws Exception {
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.hours(1L);
-               final JobID jobId = new JobID();
-
-               final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
 
-               final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+               final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(expectedJobId, 
CompletableFuture.completedFuture(expectedExecutionGraph));
 
                final int numConcurrentAccesses = 10;
 
@@ -216,7 +225,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
                        for (int i = 0; i < numConcurrentAccesses; i++) {
                                CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = CompletableFuture
                                        .supplyAsync(
-                                               () -> 
executionGraphCache.getExecutionGraph(jobId, jobManagerGateway),
+                                               () -> 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway),
                                                executor)
                                        .thenCompose(Function.identity());
 
@@ -228,10 +237,10 @@ public class ExecutionGraphCacheTest extends TestLogger {
                        Collection<AccessExecutionGraph> allExecutionGraphs = 
allExecutionGraphFutures.get();
 
                        for (AccessExecutionGraph executionGraph : 
allExecutionGraphs) {
-                               assertEquals(accessExecutionGraph, 
executionGraph);
+                               assertEquals(expectedExecutionGraph, 
executionGraph);
                        }
 
-                       verify(jobManagerGateway, 
times(1)).requestJob(eq(jobId), any(Time.class));
+                       assertThat(restfulGateway.getNumRequestJobCalls(), 
Matchers.equalTo(1));
                } finally {
                        ExecutorUtils.gracefulShutdown(5000L, 
TimeUnit.MILLISECONDS, executor);
                }
@@ -248,27 +257,32 @@ public class ExecutionGraphCacheTest extends TestLogger {
        public void testCacheInvalidationIfSuspended() throws Exception {
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.hours(1L);
-               final JobID jobId = new JobID();
+               final JobID expectedJobId = new JobID();
 
-               final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
+               final ArchivedExecutionGraph suspendedExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build();
+               final ConcurrentLinkedQueue<CompletableFuture<? extends 
AccessExecutionGraph>> requestJobAnswers = new ConcurrentLinkedQueue<>();
 
-               final AccessExecutionGraph suspendedExecutionGraph = 
mock(AccessExecutionGraph.class);
-               
when(suspendedExecutionGraph.getState()).thenReturn(JobStatus.SUSPENDED);
+               
requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph));
+               
requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph));
 
-               final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               // let's first answer with a suspended ExecutionGraph and then 
only with the correct result
-               when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(
-                       
CompletableFuture.completedFuture(suspendedExecutionGraph),
-                       
CompletableFuture.completedFuture(accessExecutionGraph));
+               final TestingRestfulGateway restfulGateway = 
TestingRestfulGateway.newBuilder()
+                       .setRequestJobFunction(
+                               jobId -> {
+                                       assertThat(jobId, 
Matchers.equalTo(expectedJobId));
+
+                                       return requestJobAnswers.poll();
+                               }
+                       )
+                       .build();
 
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
                        assertEquals(suspendedExecutionGraph, 
executionGraphFuture.get());
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
executionGraphFuture2.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture2.get());
                }
        }
 
@@ -283,35 +297,65 @@ public class ExecutionGraphCacheTest extends TestLogger {
        public void testCacheInvalidationIfSwitchToSuspended() throws Exception 
{
                final Time timeout = Time.milliseconds(100L);
                final Time timeToLive = Time.hours(1L);
-               final JobID jobId = new JobID();
-
-               final AccessExecutionGraph accessExecutionGraph = 
mock(AccessExecutionGraph.class);
+               final JobID expectedJobId = new JobID();
 
-               final SuspendableAccessExecutionGraph 
toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(jobId);
+               final SuspendableAccessExecutionGraph 
toBeSuspendedExecutionGraph = new 
SuspendableAccessExecutionGraph(expectedJobId);
 
-               final JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               // let's first answer with a JobNotFoundException and then only 
with the correct result
-               when(jobManagerGateway.requestJob(eq(jobId), 
any(Time.class))).thenReturn(
+               final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(
+                       expectedJobId,
                        
CompletableFuture.completedFuture(toBeSuspendedExecutionGraph),
-                       
CompletableFuture.completedFuture(accessExecutionGraph));
+                       
CompletableFuture.completedFuture(expectedExecutionGraph));
 
                try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
                        assertEquals(toBeSuspendedExecutionGraph, 
executionGraphFuture.get());
 
                        
toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED);
 
                        // retrieve the same job from the cache again --> this 
should return it and invalidate the cache entry
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
executionGraphFuture2.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture2.get());
 
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture3 = executionGraphCache.getExecutionGraph(jobId, 
jobManagerGateway);
+                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture3 = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
-                       assertEquals(accessExecutionGraph, 
executionGraphFuture3.get());
+                       assertEquals(expectedExecutionGraph, 
executionGraphFuture3.get());
+
+                       assertThat(restfulGateway.getNumRequestJobCalls(), 
Matchers.equalTo(2));
+               }
+       }
+
+       private CountingRestfulGateway createCountingRestfulGateway(JobID 
jobId, CompletableFuture<? extends AccessExecutionGraph>... 
accessExecutionGraphs) {
+               final ConcurrentLinkedQueue<CompletableFuture<? extends 
AccessExecutionGraph>> queue = new 
ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs));
+               return new CountingRestfulGateway(
+                       jobId,
+                       ignored -> queue.poll());
+       }
+
+       /**
+        * {@link RestfulGateway} implementation which counts the number of 
{@link #requestJob(JobID, Time)} calls.
+        */
+       private static class CountingRestfulGateway extends 
TestingRestfulGateway {
+
+               private final JobID expectedJobId;
+
+               private AtomicInteger numRequestJobCalls = new AtomicInteger(0);
+
+               private CountingRestfulGateway(JobID expectedJobId, 
Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> 
requestJobFunction) {
+                       this.expectedJobId = 
Preconditions.checkNotNull(expectedJobId);
+                       this.requestJobFunction = 
Preconditions.checkNotNull(requestJobFunction);
+               }
+
+               @Override
+               public CompletableFuture<? extends AccessExecutionGraph> 
requestJob(JobID jobId, Time timeout) {
+                       assertThat(jobId, Matchers.equalTo(expectedJobId));
+                       numRequestJobCalls.incrementAndGet();
+                       return super.requestJob(jobId, timeout);
+               }
 
-                       verify(jobManagerGateway, 
times(2)).requestJob(eq(jobId), any(Time.class));
+               public int getNumRequestJobCalls() {
+                       return numRequestJobCalls.get();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
new file mode 100644
index 0000000..1e27d8e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of the {@link RestfulGateway}.
+ */
+public class TestingRestfulGateway implements RestfulGateway {
+
+       static final Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> DEFAULT_REQUEST_JOB_FUNCTION = jobId -> 
FutureUtils.completedExceptionally(new UnsupportedOperationException());
+       static final Supplier<CompletableFuture<MultipleJobsDetails>> 
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER = () -> 
CompletableFuture.completedFuture(new 
MultipleJobsDetails(Collections.emptyList()));
+       static final Supplier<CompletableFuture<ClusterOverview>> 
DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER = () -> 
CompletableFuture.completedFuture(new ClusterOverview(0, 0, 0, 0, 0, 0, 0));
+       static final Supplier<CompletableFuture<Collection<String>>> 
DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> 
CompletableFuture.completedFuture(Collections.emptyList());
+       static final Supplier<CompletableFuture<Collection<Tuple2<ResourceID, 
String>>>> DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = 
() -> CompletableFuture.completedFuture(Collections.emptyList());
+       static final String LOCALHOST = "localhost";
+
+       protected String address;
+
+       protected String hostname;
+
+       protected String restAddress;
+
+       protected Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> requestJobFunction;
+
+       protected Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier;
+
+       protected Supplier<CompletableFuture<ClusterOverview>> 
requestClusterOverviewSupplier;
+
+       protected Supplier<CompletableFuture<Collection<String>>> 
requestMetricQueryServicePathsSupplier;
+
+       protected Supplier<CompletableFuture<Collection<Tuple2<ResourceID, 
String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
+
+       public TestingRestfulGateway() {
+               this(
+                       LOCALHOST,
+                       LOCALHOST,
+                       LOCALHOST,
+                       DEFAULT_REQUEST_JOB_FUNCTION,
+                       DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER,
+                       DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER,
+                       DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER,
+                       
DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER);
+       }
+
+       public TestingRestfulGateway(
+                       String address,
+                       String hostname,
+                       String restAddress,
+                       Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> requestJobFunction,
+                       Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier,
+                       Supplier<CompletableFuture<ClusterOverview>> 
requestClusterOverviewSupplier,
+                       Supplier<CompletableFuture<Collection<String>>> 
requestMetricQueryServicePathsSupplier,
+                       
Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> 
requestTaskManagerMetricQueryServicePathsSupplier) {
+               this.address = address;
+               this.hostname = hostname;
+               this.restAddress = restAddress;
+               this.requestJobFunction = requestJobFunction;
+               this.requestMultipleJobDetailsSupplier = 
requestMultipleJobDetailsSupplier;
+               this.requestClusterOverviewSupplier = 
requestClusterOverviewSupplier;
+               this.requestMetricQueryServicePathsSupplier = 
requestMetricQueryServicePathsSupplier;
+               this.requestTaskManagerMetricQueryServicePathsSupplier = 
requestTaskManagerMetricQueryServicePathsSupplier;
+       }
+
+       @Override
+       public CompletableFuture<String> requestRestAddress(Time timeout) {
+               return CompletableFuture.completedFuture(restAddress);
+       }
+
+       @Override
+       public CompletableFuture<? extends AccessExecutionGraph> 
requestJob(JobID jobId, Time timeout) {
+               return requestJobFunction.apply(jobId);
+       }
+
+       @Override
+       public CompletableFuture<MultipleJobsDetails> 
requestMultipleJobDetails(Time timeout) {
+               return requestMultipleJobDetailsSupplier.get();
+       }
+
+       @Override
+       public CompletableFuture<ClusterOverview> requestClusterOverview(Time 
timeout) {
+               return requestClusterOverviewSupplier.get();
+       }
+
+       @Override
+       public CompletableFuture<Collection<String>> 
requestMetricQueryServicePaths(Time timeout) {
+               return requestMetricQueryServicePathsSupplier.get();
+       }
+
+       @Override
+       public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+               return requestTaskManagerMetricQueryServicePathsSupplier.get();
+       }
+
+       @Override
+       public String getAddress() {
+               return address;
+       }
+
+       @Override
+       public String getHostname() {
+               return hostname;
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
+
+       /**
+        * Builder for the {@link TestingRestfulGateway}.
+        */
+       public static final class Builder {
+               private String address = LOCALHOST;
+               private String hostname = LOCALHOST;
+               private String restAddress = LOCALHOST;
+               private Function<JobID, CompletableFuture<? extends 
AccessExecutionGraph>> requestJobFunction;
+               private Supplier<CompletableFuture<MultipleJobsDetails>> 
requestMultipleJobDetailsSupplier;
+               private Supplier<CompletableFuture<ClusterOverview>> 
requestClusterOverviewSupplier;
+               private Supplier<CompletableFuture<Collection<String>>> 
requestMetricQueryServicePathsSupplier;
+               private 
Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> 
requestTaskManagerMetricQueryServicePathsSupplier;
+
+               public Builder() {
+                       requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
+                       requestMultipleJobDetailsSupplier = 
DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER;
+                       requestClusterOverviewSupplier = 
DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER;
+                       requestMetricQueryServicePathsSupplier = 
DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER;
+                       requestTaskManagerMetricQueryServicePathsSupplier = 
DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER;
+               }
+
+               public Builder setAddress(String address) {
+                       this.address = address;
+                       return this;
+               }
+
+               public Builder setHostname(String hostname) {
+                       this.hostname = hostname;
+                       return this;
+               }
+
+               public Builder setRestAddress(String restAddress) {
+                       this.restAddress = restAddress;
+                       return this;
+               }
+
+               public Builder setRequestJobFunction(Function<JobID, 
CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction) {
+                       this.requestJobFunction = requestJobFunction;
+                       return this;
+               }
+
+               public Builder 
setRequestMultipleJobDetailsSupplier(Supplier<CompletableFuture<MultipleJobsDetails>>
 requestMultipleJobDetailsSupplier) {
+                       this.requestMultipleJobDetailsSupplier = 
requestMultipleJobDetailsSupplier;
+                       return this;
+               }
+
+               public Builder 
setRequestClusterOverviewSupplier(Supplier<CompletableFuture<ClusterOverview>> 
requestClusterOverviewSupplier) {
+                       this.requestClusterOverviewSupplier = 
requestClusterOverviewSupplier;
+                       return this;
+               }
+
+               public Builder 
setRequestMetricQueryServicePathsSupplier(Supplier<CompletableFuture<Collection<String>>>
 requestMetricQueryServicePathsSupplier) {
+                       this.requestMetricQueryServicePathsSupplier = 
requestMetricQueryServicePathsSupplier;
+                       return this;
+               }
+
+               public Builder 
setRequestTaskManagerMetricQueryServicePathsSupplier(Supplier<CompletableFuture<Collection<Tuple2<ResourceID,
 String>>>> requestTaskManagerMetricQueryServicePathsSupplier) {
+                       this.requestTaskManagerMetricQueryServicePathsSupplier 
= requestTaskManagerMetricQueryServicePathsSupplier;
+                       return this;
+               }
+
+               public TestingRestfulGateway build() {
+                       return new TestingRestfulGateway(address, hostname, 
restAddress, requestJobFunction, requestMultipleJobDetailsSupplier, 
requestClusterOverviewSupplier, requestMetricQueryServicePathsSupplier, 
requestTaskManagerMetricQueryServicePathsSupplier);
+               }
+       }
+}

Reply via email to