[ https://issues.apache.org/jira/browse/FLINK-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14304875#comment-14304875 ]
ASF GitHub Bot commented on FLINK-1442: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/344#discussion_r24073173 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ +import scala.collection.mutable.LinkedHashMap +import scala.ref.SoftReference + class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with ActorLogging { /** * Map of execution graphs belonging to recently started jobs with the time stamp of the last - * received job event. + * received job event. The insert order is preserved through a LinkedHashMap. */ - val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]() - val lru = collection.mutable.Queue[JobID]() + val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]() override def receiveWithLogMessages: Receive = { + /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => { - graphs.update(jobID, graph) + // wrap graph inside a soft reference + graphs.update(jobID, new SoftReference(graph)) + + // clear all execution edges of the graph + val iter = graph.getAllExecutionVertices().iterator() + while (iter.hasNext) { + iter.next().clearExecutionEdges() + } + cleanup(jobID) } case RequestArchivedJobs => { - sender ! ArchivedJobs(graphs.values) + sender ! ArchivedJobs(getAllGraphs()) } case RequestJob(jobID) => { - graphs.get(jobID) match { - case Some(graph) => sender ! JobFound(jobID, graph) - case None => sender ! JobNotFound(jobID) + getGraph(jobID) match { + case graph: ExecutionGraph => sender ! JobFound(jobID, graph) + case _ => sender ! JobNotFound(jobID) } } case RequestJobStatus(jobID) => { - graphs.get(jobID) match { - case Some(eg) => sender ! CurrentJobStatus(jobID, eg.getState) - case None => sender ! JobNotFound(jobID) + getGraph(jobID) match { + case graph: ExecutionGraph => sender ! CurrentJobStatus(jobID, graph.getState) + case _ => sender ! JobNotFound(jobID) } } } - def cleanup(jobID: JobID): Unit = { - if (!lru.contains(jobID)) { - lru.enqueue(jobID) + /** + * Gets all graphs that have not been garbage collected. + * @return An iterable with all valid ExecutionGraphs + */ + def getAllGraphs() = graphs.values.flatMap(ref => ref.get match { + case Some(graph) => Seq(graph) + case _ => Seq() + }) --- End diff -- Did want to do ```graphs.values.flatMap{ _.get }```? Returns the all archived ```ExecutionGraphs```. Helps if you add the return type to ```getAllGraphs```. > Archived Execution Graph consumes too much memory > ------------------------------------------------- > > Key: FLINK-1442 > URL: https://issues.apache.org/jira/browse/FLINK-1442 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Max Michels > > The JobManager archives the execution graphs, for analysis of jobs. The > graphs may consume a lot of memory. > Especially the execution edges in all2all connection patterns are extremely > many and add up in memory consumption. > The execution edges connect all parallel tasks. So for a all2all pattern > between n and m tasks, there are n*m edges. For parallelism of multiple 100 > tasks, this can easily reach 100k objects and more, each with a set of > metadata. > I propose the following to solve that: > 1. Clear all execution edges from the graph (majority of the memory > consumers) when it is given to the archiver. > 2. Have the map/list of the archived graphs behind a soft reference, to it > will be removed under memory pressure before the JVM crashes. That may remove > graphs from the history early, but is much preferable to the JVM crashing, in > which case the graph is lost as well... > 3. Long term: The graph should be archived somewhere else. Somthing like the > History server used by Hadoop and Hive would be a good idea. -- This message was sent by Atlassian JIRA (v6.3.4#6332)