[FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#requestJob type safe Let JobMasterGateway#requestJob and DispatcherGateway#requestJob return a CompletableFuture<ArchivedExecutionGraph> instead of a CompletableFuture<AccessExecutionGraph>. In order to support the old code and the JobManagerGateway implementation we have to keep the return type in RestfulGateway. Once the old code has been removed, we should change this as well.
This closes #5309. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60b7b03f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60b7b03f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60b7b03f Branch: refs/heads/master Commit: 60b7b03f45aeb5a31202b014e486c40116124b30 Parents: dd14bdf Author: Till Rohrmann <[email protected]> Authored: Wed Jan 17 15:01:57 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Jan 26 13:49:58 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 4 +- .../runtime/dispatcher/DispatcherGateway.java | 16 ++ .../flink/runtime/jobmaster/JobMaster.java | 10 +- .../runtime/jobmaster/JobMasterGateway.java | 29 ++- .../handler/legacy/ExecutionGraphCache.java | 2 +- .../runtime/webmonitor/RestfulGateway.java | 4 +- .../handler/legacy/ExecutionGraphCacheTest.java | 214 +++++++++++-------- .../webmonitor/TestingRestfulGateway.java | 204 ++++++++++++++++++ 8 files changed, 374 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 0271913..a5c8961 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -331,13 +331,13 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } @Override - public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) { + public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner == null) { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } else { - return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout); + return jobManagerRunner.getJobManagerGateway().requestJob(jobId, timeout); } } http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java index 12cbbfb..9676429 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java @@ -20,8 +20,10 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.webmonitor.RestfulGateway; @@ -79,4 +81,18 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf * @return A future integer of the blob server port */ CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout); + + /** + * Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph, then + * the future is completed with a {@link FlinkJobNotFoundException}. + * + * <p>Note: We enforce that the returned future contains a {@link ArchivedExecutionGraph} unlike + * the super interface. + * + * @param jobId identifying the job whose AccessExecutionGraph is requested + * @param timeout for the asynchronous operation + * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link FlinkJobNotFoundException} + */ + @Override + CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index ef99d52..0c8ee16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -39,7 +39,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -784,11 +783,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } @Override - public CompletableFuture<AccessExecutionGraph> requestArchivedExecutionGraph(Time timeout) { - return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph)); - } - - @Override public CompletableFuture<JobStatus> requestJobStatus(Time timeout) { return CompletableFuture.completedFuture(executionGraph.getState()); } @@ -803,9 +797,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } @Override - public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) { + public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) { if (jobGraph.getJobID().equals(jobId)) { - return requestArchivedExecutionGraph(timeout); + return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph)); } else { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 0d896ac..b811531 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -18,13 +18,13 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -54,7 +55,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; /** - * {@link JobMaster} rpc gateway interface + * {@link JobMaster} rpc gateway interface. */ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId>, RestfulGateway { @@ -224,14 +225,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp @RpcTimeout final Time timeout); /** - * Sends the heartbeat to job manager from task manager + * Sends the heartbeat to job manager from task manager. * * @param resourceID unique id of the task manager */ void heartbeatFromTaskManager(final ResourceID resourceID); /** - * Sends heartbeat request from the resource manager + * Sends heartbeat request from the resource manager. * * @param resourceID unique id of the resource manager */ @@ -246,18 +247,24 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout); /** - * Request the {@link ArchivedExecutionGraph} of the currently executed job. + * Requests the current job status. * * @param timeout for the rpc call - * @return Future archived execution graph derived from the currently executed job + * @return Future containing the current job status */ - CompletableFuture<AccessExecutionGraph> requestArchivedExecutionGraph(@RpcTimeout Time timeout); + CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout); /** - * Requests the current job status. + * Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph, then + * the future is completed with a {@link FlinkJobNotFoundException}. * - * @param timeout for the rpc call - * @return Future containing the current job status + * <p>Note: We enforce that the returned future contains a {@link ArchivedExecutionGraph} unlike + * the super interface. + * + * @param jobId identifying the job whose AccessExecutionGraph is requested + * @param timeout for the asynchronous operation + * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link FlinkJobNotFoundException} */ - CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout); + @Override + CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java index f63b042..19186c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -122,7 +122,7 @@ public class ExecutionGraphCache implements Closeable { } if (successfulUpdate) { - final CompletableFuture<AccessExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, timeout); + final CompletableFuture<? extends AccessExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, timeout); executionGraphFuture.whenComplete( (AccessExecutionGraph executionGraph, Throwable throwable) -> { http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index 4160434..4da3947 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -54,14 +54,14 @@ public interface RestfulGateway extends RpcGateway { CompletableFuture<String> requestRestAddress(@RpcTimeout Time timeout); /** - * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then + * Requests the {@link AccessExecutionGraph} for the given jobId. If there is no such graph, then * the future is completed with a {@link FlinkJobNotFoundException}. * * @param jobId identifying the job whose AccessExecutionGraph is requested * @param timeout for the asynchronous operation * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link FlinkJobNotFoundException} */ - CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout); + CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout); /** * Requests job details currently being executed on the Flink cluster. http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java index ecadaa5..8bdaff5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -30,38 +30,49 @@ import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * Tests for the {@link ExecutionGraphCache}. */ public class ExecutionGraphCacheTest extends TestLogger { + private static ArchivedExecutionGraph expectedExecutionGraph; + private static final JobID expectedJobId = new JobID(); + + @BeforeClass + public static void setup() { + expectedExecutionGraph = new ArchivedExecutionGraphBuilder().build(); + } + /** * Tests that we can cache AccessExecutionGraphs over multiple accesses. */ @@ -69,23 +80,19 @@ public class ExecutionGraphCacheTest extends TestLogger { public void testExecutionGraphCaching() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + final CountingRestfulGateway restfulGateway = createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, accessExecutionGraphFuture.get()); + assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get()); - CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, accessExecutionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get()); - // verify that we only issued a single request to the gateway - verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1)); } } @@ -96,25 +103,25 @@ public class ExecutionGraphCacheTest extends TestLogger { public void testExecutionGraphEntryInvalidation() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.milliseconds(1L); - final JobID jobId = new JobID(); - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + final CountingRestfulGateway restfulGateway = createCountingRestfulGateway( + expectedJobId, + CompletableFuture.completedFuture(expectedExecutionGraph), + CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, executionGraphFuture.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture.get()); // sleep for the TTL - Thread.sleep(timeToLive.toMilliseconds()); + Thread.sleep(timeToLive.toMilliseconds() * 5L); - CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); - verify(jobManagerGateway, times(2)).requestJob(eq(jobId), any(Time.class)); + assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2)); } } @@ -127,18 +134,15 @@ public class ExecutionGraphCacheTest extends TestLogger { public void testImmediateCacheInvalidationAfterFailure() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); // let's first answer with a JobNotFoundException and then only with the correct result - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( - FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)), - CompletableFuture.completedFuture(accessExecutionGraph)); + final CountingRestfulGateway restfulGateway = createCountingRestfulGateway( + expectedJobId, + FutureUtils.completedExceptionally(new FlinkJobNotFoundException(expectedJobId)), + CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); try { executionGraphFuture.get(); @@ -148,9 +152,9 @@ public class ExecutionGraphCacheTest extends TestLogger { assertTrue(ee.getCause() instanceof FlinkException); } - CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); } } @@ -162,27 +166,36 @@ public class ExecutionGraphCacheTest extends TestLogger { public void testCacheEntryCleanup() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.milliseconds(1L); - final JobID jobId1 = new JobID(); - final JobID jobId2 = new JobID(); - final AccessExecutionGraph accessExecutionGraph1 = mock(AccessExecutionGraph.class); - final AccessExecutionGraph accessExecutionGraph2 = mock(AccessExecutionGraph.class); - - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId1), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph1)); - when(jobManagerGateway.requestJob(eq(jobId2), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph2)); + final JobID expectedJobId2 = new JobID(); + final ArchivedExecutionGraph expectedExecutionGraph2 = new ArchivedExecutionGraphBuilder().build(); + + final AtomicInteger requestJobCalls = new AtomicInteger(0); + final TestingRestfulGateway restfulGateway = TestingRestfulGateway.newBuilder() + .setRequestJobFunction( + jobId -> { + requestJobCalls.incrementAndGet(); + if (jobId.equals(expectedJobId)) { + return CompletableFuture.completedFuture(expectedExecutionGraph); + } else if (jobId.equals(expectedJobId2)) { + return CompletableFuture.completedFuture(expectedExecutionGraph2); + } else { + throw new AssertionError("Invalid job id received."); + } + } + ) + .build(); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> executionGraph1Future = executionGraphCache.getExecutionGraph(jobId1, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraph1Future = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - CompletableFuture<AccessExecutionGraph> executionGraph2Future = executionGraphCache.getExecutionGraph(jobId2, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraph2Future = executionGraphCache.getExecutionGraph(expectedJobId2, restfulGateway); - assertEquals(accessExecutionGraph1, executionGraph1Future.get()); + assertEquals(expectedExecutionGraph, executionGraph1Future.get()); - assertEquals(accessExecutionGraph2, executionGraph2Future.get()); + assertEquals(expectedExecutionGraph2, executionGraph2Future.get()); - verify(jobManagerGateway, times(1)).requestJob(eq(jobId1), any(Time.class)); - verify(jobManagerGateway, times(1)).requestJob(eq(jobId2), any(Time.class)); + assertThat(requestJobCalls.get(), Matchers.equalTo(2)); Thread.sleep(timeToLive.toMilliseconds()); @@ -199,12 +212,8 @@ public class ExecutionGraphCacheTest extends TestLogger { public void testConcurrentAccess() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + final CountingRestfulGateway restfulGateway = createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph)); final int numConcurrentAccesses = 10; @@ -216,7 +225,7 @@ public class ExecutionGraphCacheTest extends TestLogger { for (int i = 0; i < numConcurrentAccesses; i++) { CompletableFuture<AccessExecutionGraph> executionGraphFuture = CompletableFuture .supplyAsync( - () -> executionGraphCache.getExecutionGraph(jobId, jobManagerGateway), + () -> executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway), executor) .thenCompose(Function.identity()); @@ -228,10 +237,10 @@ public class ExecutionGraphCacheTest extends TestLogger { Collection<AccessExecutionGraph> allExecutionGraphs = allExecutionGraphFutures.get(); for (AccessExecutionGraph executionGraph : allExecutionGraphs) { - assertEquals(accessExecutionGraph, executionGraph); + assertEquals(expectedExecutionGraph, executionGraph); } - verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(1)); } finally { ExecutorUtils.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor); } @@ -248,27 +257,32 @@ public class ExecutionGraphCacheTest extends TestLogger { public void testCacheInvalidationIfSuspended() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); + final JobID expectedJobId = new JobID(); - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + final ArchivedExecutionGraph suspendedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build(); + final ConcurrentLinkedQueue<CompletableFuture<? extends AccessExecutionGraph>> requestJobAnswers = new ConcurrentLinkedQueue<>(); - final AccessExecutionGraph suspendedExecutionGraph = mock(AccessExecutionGraph.class); - when(suspendedExecutionGraph.getState()).thenReturn(JobStatus.SUSPENDED); + requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph)); + requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph)); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - // let's first answer with a suspended ExecutionGraph and then only with the correct result - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( - CompletableFuture.completedFuture(suspendedExecutionGraph), - CompletableFuture.completedFuture(accessExecutionGraph)); + final TestingRestfulGateway restfulGateway = TestingRestfulGateway.newBuilder() + .setRequestJobFunction( + jobId -> { + assertThat(jobId, Matchers.equalTo(expectedJobId)); + + return requestJobAnswers.poll(); + } + ) + .build(); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); assertEquals(suspendedExecutionGraph, executionGraphFuture.get()); - CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); } } @@ -283,35 +297,65 @@ public class ExecutionGraphCacheTest extends TestLogger { public void testCacheInvalidationIfSwitchToSuspended() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + final JobID expectedJobId = new JobID(); - final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(jobId); + final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - // let's first answer with a JobNotFoundException and then only with the correct result - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( + final CountingRestfulGateway restfulGateway = createCountingRestfulGateway( + expectedJobId, CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), - CompletableFuture.completedFuture(accessExecutionGraph)); + CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get()); toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED); // retrieve the same job from the cache again --> this should return it and invalidate the cache entry - CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture2.get()); - CompletableFuture<AccessExecutionGraph> executionGraphFuture3 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture<AccessExecutionGraph> executionGraphFuture3 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, executionGraphFuture3.get()); + assertEquals(expectedExecutionGraph, executionGraphFuture3.get()); + + assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2)); + } + } + + private CountingRestfulGateway createCountingRestfulGateway(JobID jobId, CompletableFuture<? extends AccessExecutionGraph>... accessExecutionGraphs) { + final ConcurrentLinkedQueue<CompletableFuture<? extends AccessExecutionGraph>> queue = new ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs)); + return new CountingRestfulGateway( + jobId, + ignored -> queue.poll()); + } + + /** + * {@link RestfulGateway} implementation which counts the number of {@link #requestJob(JobID, Time)} calls. + */ + private static class CountingRestfulGateway extends TestingRestfulGateway { + + private final JobID expectedJobId; + + private AtomicInteger numRequestJobCalls = new AtomicInteger(0); + + private CountingRestfulGateway(JobID expectedJobId, Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction) { + this.expectedJobId = Preconditions.checkNotNull(expectedJobId); + this.requestJobFunction = Preconditions.checkNotNull(requestJobFunction); + } + + @Override + public CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID jobId, Time timeout) { + assertThat(jobId, Matchers.equalTo(expectedJobId)); + numRequestJobCalls.incrementAndGet(); + return super.requestJob(jobId, timeout); + } - verify(jobManagerGateway, times(2)).requestJob(eq(jobId), any(Time.class)); + public int getNumRequestJobCalls() { + return numRequestJobCalls.get(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/60b7b03f/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java new file mode 100644 index 0000000..1e27d8e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Testing implementation of the {@link RestfulGateway}. + */ +public class TestingRestfulGateway implements RestfulGateway { + + static final Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> DEFAULT_REQUEST_JOB_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); + static final Supplier<CompletableFuture<MultipleJobsDetails>> DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER = () -> CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())); + static final Supplier<CompletableFuture<ClusterOverview>> DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER = () -> CompletableFuture.completedFuture(new ClusterOverview(0, 0, 0, 0, 0, 0, 0)); + static final Supplier<CompletableFuture<Collection<String>>> DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> CompletableFuture.completedFuture(Collections.emptyList()); + static final Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> CompletableFuture.completedFuture(Collections.emptyList()); + static final String LOCALHOST = "localhost"; + + protected String address; + + protected String hostname; + + protected String restAddress; + + protected Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction; + + protected Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier; + + protected Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier; + + protected Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier; + + protected Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier; + + public TestingRestfulGateway() { + this( + LOCALHOST, + LOCALHOST, + LOCALHOST, + DEFAULT_REQUEST_JOB_FUNCTION, + DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER, + DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER, + DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER, + DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER); + } + + public TestingRestfulGateway( + String address, + String hostname, + String restAddress, + Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction, + Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier, + Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier, + Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier, + Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier) { + this.address = address; + this.hostname = hostname; + this.restAddress = restAddress; + this.requestJobFunction = requestJobFunction; + this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier; + this.requestClusterOverviewSupplier = requestClusterOverviewSupplier; + this.requestMetricQueryServicePathsSupplier = requestMetricQueryServicePathsSupplier; + this.requestTaskManagerMetricQueryServicePathsSupplier = requestTaskManagerMetricQueryServicePathsSupplier; + } + + @Override + public CompletableFuture<String> requestRestAddress(Time timeout) { + return CompletableFuture.completedFuture(restAddress); + } + + @Override + public CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID jobId, Time timeout) { + return requestJobFunction.apply(jobId); + } + + @Override + public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) { + return requestMultipleJobDetailsSupplier.get(); + } + + @Override + public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) { + return requestClusterOverviewSupplier.get(); + } + + @Override + public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) { + return requestMetricQueryServicePathsSupplier.get(); + } + + @Override + public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { + return requestTaskManagerMetricQueryServicePathsSupplier.get(); + } + + @Override + public String getAddress() { + return address; + } + + @Override + public String getHostname() { + return hostname; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for the {@link TestingRestfulGateway}. + */ + public static final class Builder { + private String address = LOCALHOST; + private String hostname = LOCALHOST; + private String restAddress = LOCALHOST; + private Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction; + private Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier; + private Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier; + private Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier; + private Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier; + + public Builder() { + requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION; + requestMultipleJobDetailsSupplier = DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER; + requestClusterOverviewSupplier = DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER; + requestMetricQueryServicePathsSupplier = DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER; + requestTaskManagerMetricQueryServicePathsSupplier = DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER; + } + + public Builder setAddress(String address) { + this.address = address; + return this; + } + + public Builder setHostname(String hostname) { + this.hostname = hostname; + return this; + } + + public Builder setRestAddress(String restAddress) { + this.restAddress = restAddress; + return this; + } + + public Builder setRequestJobFunction(Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction) { + this.requestJobFunction = requestJobFunction; + return this; + } + + public Builder setRequestMultipleJobDetailsSupplier(Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier) { + this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier; + return this; + } + + public Builder setRequestClusterOverviewSupplier(Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier) { + this.requestClusterOverviewSupplier = requestClusterOverviewSupplier; + return this; + } + + public Builder setRequestMetricQueryServicePathsSupplier(Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier) { + this.requestMetricQueryServicePathsSupplier = requestMetricQueryServicePathsSupplier; + return this; + } + + public Builder setRequestTaskManagerMetricQueryServicePathsSupplier(Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier) { + this.requestTaskManagerMetricQueryServicePathsSupplier = requestTaskManagerMetricQueryServicePathsSupplier; + return this; + } + + public TestingRestfulGateway build() { + return new TestingRestfulGateway(address, hostname, restAddress, requestJobFunction, requestMultipleJobDetailsSupplier, requestClusterOverviewSupplier, requestMetricQueryServicePathsSupplier, requestTaskManagerMetricQueryServicePathsSupplier); + } + } +}
