[
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339499#comment-16339499
]
ASF GitHub Bot commented on FLINK-8450:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5309#discussion_r163902431
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
---
@@ -30,62 +30,69 @@
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
/**
* Tests for the {@link ExecutionGraphCache}.
*/
public class ExecutionGraphCacheTest extends TestLogger {
+ private static ArchivedExecutionGraph expectedExecutionGraph;
+ private static final JobID expectedJobId = new JobID();
+
+ @BeforeClass
+ public static void setup() {
+ expectedExecutionGraph = new
ArchivedExecutionGraphBuilder().build();
+ }
+
/**
* Tests that we can cache AccessExecutionGraphs over multiple accesses.
*/
@Test
public void testExecutionGraphCaching() throws Exception {
final Time timeout = Time.milliseconds(100L);
final Time timeToLive = Time.hours(1L);
- final JobID jobId = new JobID();
- final AccessExecutionGraph accessExecutionGraph =
mock(AccessExecutionGraph.class);
- final JobManagerGateway jobManagerGateway =
mock(JobManagerGateway.class);
- when(jobManagerGateway.requestJob(eq(jobId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
+ final CountingRestfulGateway restfulGateway =
createCountingRestfulGateway(expectedJobId,
CompletableFuture.completedFuture(expectedExecutionGraph));
try (ExecutionGraphCache executionGraphCache = new
ExecutionGraphCache(timeout, timeToLive)) {
- CompletableFuture<AccessExecutionGraph>
accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId,
jobManagerGateway);
+ CompletableFuture<AccessExecutionGraph>
accessExecutionGraphFuture =
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
- assertEquals(accessExecutionGraph,
accessExecutionGraphFuture.get());
+ assertEquals(expectedExecutionGraph,
accessExecutionGraphFuture.get());
- CompletableFuture<AccessExecutionGraph>
accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId,
jobManagerGateway);
+ CompletableFuture<AccessExecutionGraph>
accessExecutionGraphFuture2 =
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
- assertEquals(accessExecutionGraph,
accessExecutionGraphFuture2.get());
+ assertEquals(expectedExecutionGraph,
accessExecutionGraphFuture2.get());
--- End diff --
True, will remove it.
> Make JobMaster/DispatcherGateway#requestJob type safe
> -----------------------------------------------------
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Minor
> Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a
> {{CompletableFuture<AccessExecutionGraph>}}. Since {{AccessExecutionGraph}}
> is non serializable it could fail if we execute this RPC from a remote
> system. In order to make it typesafe we should change its signature to
> {{SerializableExecutionGraph}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)