This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 397c9c60e9d24556945d9c59507bfb0dbd6e31a6
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Wed Feb 20 12:12:30 2019 +0100

    [FLINK-11678] Let ExecutionGraphCache store only ArchivedExecutionGraphs
    
    The ExecutionGraphCache now only stores ArchivedExecutionGraphs since it 
will never receive an
    ExecutionGraph from the RestfulGateway.
    
    - Remove ExecutionGraphTest#testCacheInvalidationIfSuspended
    - Remove ExecutionGraphTest#testCacheInvalidationIfSwitchToSuspended
    
    This closes #7769.
---
 .../rest/handler/legacy/ExecutionGraphCache.java   | 55 +++++----------
 .../handler/legacy/ExecutionGraphCacheTest.java    | 81 ----------------------
 .../runtime/webmonitor/TestingRestfulGateway.java  |  1 -
 3 files changed, 19 insertions(+), 118 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index 7e9e6ee..f634a62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -21,21 +21,21 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Cache for {@link AccessExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
+ * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
  * has an associated time to live after which a new request will trigger the 
reloading of the
- * {@link AccessExecutionGraph} from the cluster.
+ * {@link ArchivedExecutionGraph} from the cluster.
  */
 public class ExecutionGraphCache implements Closeable {
 
@@ -76,12 +76,15 @@ public class ExecutionGraphCache implements Closeable {
         * {@link AccessExecutionGraph} will be requested again after the 
refresh interval has passed
         * or if the graph could not be retrieved from the given gateway.
         *
-        * @param jobId identifying the {@link AccessExecutionGraph} to get
-        * @param restfulGateway to request the {@link AccessExecutionGraph} 
from
-        * @return Future containing the requested {@link AccessExecutionGraph}
+        * @param jobId identifying the {@link ArchivedExecutionGraph} to get
+        * @param restfulGateway to request the {@link ArchivedExecutionGraph} 
from
+        * @return Future containing the requested {@link 
ArchivedExecutionGraph}
         */
        public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID 
jobId, RestfulGateway restfulGateway) {
+               return getExecutionGraphInternal(jobId, 
restfulGateway).thenApply(Function.identity());
+       }
 
+       private CompletableFuture<ArchivedExecutionGraph> 
getExecutionGraphInternal(JobID jobId, RestfulGateway restfulGateway) {
                Preconditions.checkState(running, "ExecutionGraphCache is no 
longer running");
 
                while (true) {
@@ -89,26 +92,12 @@ public class ExecutionGraphCache implements Closeable {
 
                        final long currentTime = System.currentTimeMillis();
 
-                       if (oldEntry != null) {
-                               if (currentTime < oldEntry.getTTL()) {
-                                       final 
CompletableFuture<AccessExecutionGraph> executionGraphFuture = 
oldEntry.getExecutionGraphFuture();
-                                       if (executionGraphFuture.isDone() && 
!executionGraphFuture.isCompletedExceptionally()) {
-
-                                               // TODO: Remove once we no 
longer request the actual ExecutionGraph from the JobManager but only the 
ArchivedExecutionGraph
-                                               try {
-                                                       final 
AccessExecutionGraph executionGraph = executionGraphFuture.get();
-                                                       if 
(executionGraph.getState() != JobStatus.SUSPENDED) {
-                                                               return 
executionGraphFuture;
-                                                       }
-                                                       // send a new request 
to get the ExecutionGraph from the new leader
-                                               } catch (InterruptedException | 
ExecutionException e) {
-                                                       throw new 
RuntimeException("Could not retrieve ExecutionGraph from the orderly completed 
future. This should never happen.", e);
-                                               }
-                                       } else if 
(!executionGraphFuture.isDone()) {
-                                               return executionGraphFuture;
-                                       }
-                                       // otherwise it must be completed 
exceptionally
+                       if (oldEntry != null && currentTime < 
oldEntry.getTTL()) {
+                               final CompletableFuture<ArchivedExecutionGraph> 
executionGraphFuture = oldEntry.getExecutionGraphFuture();
+                               if 
(!executionGraphFuture.isCompletedExceptionally()) {
+                                       return executionGraphFuture;
                                }
+                               // otherwise it must be completed exceptionally
                        }
 
                        final ExecutionGraphEntry newEntry = new 
ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds());
@@ -124,10 +113,10 @@ public class ExecutionGraphCache implements Closeable {
                        }
 
                        if (successfulUpdate) {
-                               final CompletableFuture<? extends 
AccessExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, 
timeout);
+                               final CompletableFuture<ArchivedExecutionGraph> 
executionGraphFuture = restfulGateway.requestJob(jobId, timeout);
 
                                executionGraphFuture.whenComplete(
-                                       (AccessExecutionGraph executionGraph, 
Throwable throwable) -> {
+                                       (ArchivedExecutionGraph executionGraph, 
Throwable throwable) -> {
                                                if (throwable != null) {
                                                        
newEntry.getExecutionGraphFuture().completeExceptionally(throwable);
 
@@ -135,12 +124,6 @@ public class ExecutionGraphCache implements Closeable {
                                                        
cachedExecutionGraphs.remove(jobId, newEntry);
                                                } else {
                                                        
newEntry.getExecutionGraphFuture().complete(executionGraph);
-
-                                                       // TODO: Remove once we 
no longer request the actual ExecutionGraph from the JobManager but only the 
ArchivedExecutionGraph
-                                                       if 
(executionGraph.getState() == JobStatus.SUSPENDED) {
-                                                               // remove the 
entry in case of suspension --> triggers new request when accessed next time
-                                                               
cachedExecutionGraphs.remove(jobId, newEntry);
-                                                       }
                                                }
                                        });
 
@@ -171,7 +154,7 @@ public class ExecutionGraphCache implements Closeable {
        private static final class ExecutionGraphEntry {
                private final long ttl;
 
-               private final CompletableFuture<AccessExecutionGraph> 
executionGraphFuture;
+               private final CompletableFuture<ArchivedExecutionGraph> 
executionGraphFuture;
 
                ExecutionGraphEntry(long ttl) {
                        this.ttl = ttl;
@@ -182,7 +165,7 @@ public class ExecutionGraphCache implements Closeable {
                        return ttl;
                }
 
-               public CompletableFuture<AccessExecutionGraph> 
getExecutionGraphFuture() {
+               public CompletableFuture<ArchivedExecutionGraph> 
getExecutionGraphFuture() {
                        return executionGraphFuture;
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index bd0e659..545b34a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
@@ -245,86 +244,6 @@ public class ExecutionGraphCacheTest extends TestLogger {
                }
        }
 
-       /**
-        * Tests that a cache entry is invalidated if the retrieved {@link 
AccessExecutionGraph} is in
-        * state {@link JobStatus#SUSPENDED}.
-        *
-        * <p>This test can be removed once we no longer request the actual 
{@link ExecutionGraph} from the
-        * JobManager.
-        */
-       @Test
-       public void testCacheInvalidationIfSuspended() throws Exception {
-               final Time timeout = Time.milliseconds(100L);
-               final Time timeToLive = Time.hours(1L);
-               final JobID expectedJobId = new JobID();
-
-               final ArchivedExecutionGraph suspendedExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build();
-               final 
ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> 
requestJobAnswers = new ConcurrentLinkedQueue<>();
-
-               
requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph));
-               
requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph));
-
-               final TestingRestfulGateway restfulGateway = 
TestingRestfulGateway.newBuilder()
-                       .setRequestJobFunction(
-                               jobId -> {
-                                       assertThat(jobId, 
Matchers.equalTo(expectedJobId));
-
-                                       return requestJobAnswers.poll();
-                               }
-                       )
-                       .build();
-
-               try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
-
-                       assertEquals(suspendedExecutionGraph, 
executionGraphFuture.get());
-
-                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
-
-                       assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
-               }
-       }
-
-       /**
-        * Tests that a cache entry is invalidated if the retrieved {@link 
AccessExecutionGraph} changes its
-        * state to {@link JobStatus#SUSPENDED}.
-        *
-        * <p>This test can be removed once we no longer request the actual 
{@link ExecutionGraph} from the
-        * JobManager.
-        */
-       @Test
-       public void testCacheInvalidationIfSwitchToSuspended() throws Exception 
{
-               final Time timeout = Time.milliseconds(100L);
-               final Time timeToLive = Time.hours(1L);
-               final JobID expectedJobId = new JobID();
-
-               final SuspendableAccessExecutionGraph 
toBeSuspendedExecutionGraph = new 
SuspendableAccessExecutionGraph(expectedJobId);
-
-               final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(
-                       expectedJobId,
-                       
CompletableFuture.completedFuture(toBeSuspendedExecutionGraph),
-                       
CompletableFuture.completedFuture(expectedExecutionGraph));
-
-               try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
-                       CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
-
-                       assertEquals(toBeSuspendedExecutionGraph, 
executionGraphFuture.get());
-
-                       
toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED);
-
-                       // retrieve the same job from the cache again --> this 
should return it and invalidate the cache entry
-                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
-
-                       assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
-
-                       executionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
-
-                       assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
-
-                       assertThat(restfulGateway.getNumRequestJobCalls(), 
Matchers.equalTo(2));
-               }
-       }
-
        private CountingRestfulGateway createCountingRestfulGateway(JobID 
jobId, CompletableFuture<ArchivedExecutionGraph>... accessExecutionGraphs) {
                final 
ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> queue = new 
ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs));
                return new CountingRestfulGateway(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index de79e36..68abbf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;

Reply via email to