[
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182774#comment-16182774
]
ASF GitHub Bot commented on FLINK-7668:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4728#discussion_r141389329
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Cache for {@link AccessExecutionGraph} which are obtained from the
Flink cluster. Every cache entry
+ * has an associated time to live after which a new request will trigger
the reloading of the
+ * {@link AccessExecutionGraph} from the cluster.
+ */
+public class ExecutionGraphCache implements Closeable {
+
+ private final Time timeout;
+
+ private final Time timeToLive;
+
+ private final ConcurrentHashMap<JobID, ExecutionGraphEntry>
cachedExecutionGraphs;
+
+ private volatile boolean running = true;
+
+ public ExecutionGraphCache(
+ Time timeout,
+ Time timeToLive) {
+ this.timeout = checkNotNull(timeout);
+ this.timeToLive = checkNotNull(timeToLive);
+
+ cachedExecutionGraphs = new ConcurrentHashMap<>(4);
+ }
+
+ @Override
+ public void close() {
+ running = false;
+
+ // clear all cached AccessExecutionGraphs
+ cachedExecutionGraphs.clear();
+ }
+
+ /**
+ * Gets the {@link AccessExecutionGraph} for the given {@link JobID}
and caches it. The
+ * {@link AccessExecutionGraph} will be requested again after the
refresh interval has passed
+ * or if the graph could not be retrieved from the given gateway.
+ *
+ * @param jobId identifying the {@link AccessExecutionGraph} to get
+ * @param restfulGateway to request the {@link AccessExecutionGraph}
from
+ * @return Future containing the requested {@link AccessExecutionGraph}
+ */
+ public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID
jobId, RestfulGateway restfulGateway) {
+
+ Preconditions.checkState(running, "ExecutionGraphCache is no
longer running");
+
+ while (true) {
+ final ExecutionGraphEntry oldEntry =
cachedExecutionGraphs.get(jobId);
+
+ final long currentTime = System.currentTimeMillis();
+
+ if (oldEntry != null) {
+ if (currentTime < oldEntry.getTTL()) {
+ if
(oldEntry.getExecutionGraphFuture().isDone() &&
!oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) {
+
+ // TODO: Remove once we no
longer request the actual ExecutionGraph from the JobManager but only the
ArchivedExecutionGraph
--- End diff --
yes
> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -----------------------------------------------------------------
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
> Issue Type: Sub-task
> Components: REST
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
>
> Once we support offline {{AccessExecutionGraph}} implementation (see
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}}
> after which the {{AccessExecutionGraph}} is retrieved again from the
> {{JobMaster}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)