This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 397c9c60e9d24556945d9c59507bfb0dbd6e31a6 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Wed Feb 20 12:12:30 2019 +0100 [FLINK-11678] Let ExecutionGraphCache store only ArchivedExecutionGraphs The ExecutionGraphCache now only stores ArchivedExecutionGraphs since it will never receive an ExecutionGraph from the RestfulGateway. - Remove ExecutionGraphTest#testCacheInvalidationIfSuspended - Remove ExecutionGraphTest#testCacheInvalidationIfSwitchToSuspended This closes #7769. --- .../rest/handler/legacy/ExecutionGraphCache.java | 55 +++++---------- .../handler/legacy/ExecutionGraphCacheTest.java | 81 ---------------------- .../runtime/webmonitor/TestingRestfulGateway.java | 1 - 3 files changed, 19 insertions(+), 118 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java index 7e9e6ee..f634a62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -21,21 +21,21 @@ package org.apache.flink.runtime.rest.handler.legacy; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Cache for {@link AccessExecutionGraph} which are obtained from the Flink cluster. Every cache entry + * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache entry * has an associated time to live after which a new request will trigger the reloading of the - * {@link AccessExecutionGraph} from the cluster. + * {@link ArchivedExecutionGraph} from the cluster. */ public class ExecutionGraphCache implements Closeable { @@ -76,12 +76,15 @@ public class ExecutionGraphCache implements Closeable { * {@link AccessExecutionGraph} will be requested again after the refresh interval has passed * or if the graph could not be retrieved from the given gateway. * - * @param jobId identifying the {@link AccessExecutionGraph} to get - * @param restfulGateway to request the {@link AccessExecutionGraph} from - * @return Future containing the requested {@link AccessExecutionGraph} + * @param jobId identifying the {@link ArchivedExecutionGraph} to get + * @param restfulGateway to request the {@link ArchivedExecutionGraph} from + * @return Future containing the requested {@link ArchivedExecutionGraph} */ public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) { + return getExecutionGraphInternal(jobId, restfulGateway).thenApply(Function.identity()); + } + private CompletableFuture<ArchivedExecutionGraph> getExecutionGraphInternal(JobID jobId, RestfulGateway restfulGateway) { Preconditions.checkState(running, "ExecutionGraphCache is no longer running"); while (true) { @@ -89,26 +92,12 @@ public class ExecutionGraphCache implements Closeable { final long currentTime = System.currentTimeMillis(); - if (oldEntry != null) { - if (currentTime < oldEntry.getTTL()) { - final CompletableFuture<AccessExecutionGraph> executionGraphFuture = oldEntry.getExecutionGraphFuture(); - if (executionGraphFuture.isDone() && !executionGraphFuture.isCompletedExceptionally()) { - - // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph - try { - final AccessExecutionGraph executionGraph = executionGraphFuture.get(); - if (executionGraph.getState() != JobStatus.SUSPENDED) { - return executionGraphFuture; - } - // send a new request to get the ExecutionGraph from the new leader - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed future. This should never happen.", e); - } - } else if (!executionGraphFuture.isDone()) { - return executionGraphFuture; - } - // otherwise it must be completed exceptionally + if (oldEntry != null && currentTime < oldEntry.getTTL()) { + final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture = oldEntry.getExecutionGraphFuture(); + if (!executionGraphFuture.isCompletedExceptionally()) { + return executionGraphFuture; } + // otherwise it must be completed exceptionally } final ExecutionGraphEntry newEntry = new ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds()); @@ -124,10 +113,10 @@ public class ExecutionGraphCache implements Closeable { } if (successfulUpdate) { - final CompletableFuture<? extends AccessExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, timeout); + final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, timeout); executionGraphFuture.whenComplete( - (AccessExecutionGraph executionGraph, Throwable throwable) -> { + (ArchivedExecutionGraph executionGraph, Throwable throwable) -> { if (throwable != null) { newEntry.getExecutionGraphFuture().completeExceptionally(throwable); @@ -135,12 +124,6 @@ public class ExecutionGraphCache implements Closeable { cachedExecutionGraphs.remove(jobId, newEntry); } else { newEntry.getExecutionGraphFuture().complete(executionGraph); - - // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph - if (executionGraph.getState() == JobStatus.SUSPENDED) { - // remove the entry in case of suspension --> triggers new request when accessed next time - cachedExecutionGraphs.remove(jobId, newEntry); - } } }); @@ -171,7 +154,7 @@ public class ExecutionGraphCache implements Closeable { private static final class ExecutionGraphEntry { private final long ttl; - private final CompletableFuture<AccessExecutionGraph> executionGraphFuture; + private final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture; ExecutionGraphEntry(long ttl) { this.ttl = ttl; @@ -182,7 +165,7 @@ public class ExecutionGraphCache implements Closeable { return ttl; } - public CompletableFuture<AccessExecutionGraph> getExecutionGraphFuture() { + public CompletableFuture<ArchivedExecutionGraph> getExecutionGraphFuture() { return executionGraphFuture; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java index bd0e659..545b34a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; @@ -245,86 +244,6 @@ public class ExecutionGraphCacheTest extends TestLogger { } } - /** - * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} is in - * state {@link JobStatus#SUSPENDED}. - * - * <p>This test can be removed once we no longer request the actual {@link ExecutionGraph} from the - * JobManager. - */ - @Test - public void testCacheInvalidationIfSuspended() throws Exception { - final Time timeout = Time.milliseconds(100L); - final Time timeToLive = Time.hours(1L); - final JobID expectedJobId = new JobID(); - - final ArchivedExecutionGraph suspendedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build(); - final ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> requestJobAnswers = new ConcurrentLinkedQueue<>(); - - requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph)); - requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph)); - - final TestingRestfulGateway restfulGateway = TestingRestfulGateway.newBuilder() - .setRequestJobFunction( - jobId -> { - assertThat(jobId, Matchers.equalTo(expectedJobId)); - - return requestJobAnswers.poll(); - } - ) - .build(); - - try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - - assertEquals(suspendedExecutionGraph, executionGraphFuture.get()); - - executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - - assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - } - } - - /** - * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} changes its - * state to {@link JobStatus#SUSPENDED}. - * - * <p>This test can be removed once we no longer request the actual {@link ExecutionGraph} from the - * JobManager. - */ - @Test - public void testCacheInvalidationIfSwitchToSuspended() throws Exception { - final Time timeout = Time.milliseconds(100L); - final Time timeToLive = Time.hours(1L); - final JobID expectedJobId = new JobID(); - - final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(expectedJobId); - - final CountingRestfulGateway restfulGateway = createCountingRestfulGateway( - expectedJobId, - CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), - CompletableFuture.completedFuture(expectedExecutionGraph)); - - try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - - assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get()); - - toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED); - - // retrieve the same job from the cache again --> this should return it and invalidate the cache entry - executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - - assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - - executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - - assertEquals(expectedExecutionGraph, executionGraphFuture.get()); - - assertThat(restfulGateway.getNumRequestJobCalls(), Matchers.equalTo(2)); - } - } - private CountingRestfulGateway createCountingRestfulGateway(JobID jobId, CompletableFuture<ArchivedExecutionGraph>... accessExecutionGraphs) { final ConcurrentLinkedQueue<CompletableFuture<ArchivedExecutionGraph>> queue = new ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs)); return new CountingRestfulGateway( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index de79e36..68abbf0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID;