[ https://issues.apache.org/jira/browse/FLINK-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14304867#comment-14304867 ]
ASF GitHub Bot commented on FLINK-1442: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/344#discussion_r24072836 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ +import scala.collection.mutable.LinkedHashMap +import scala.ref.SoftReference + class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with ActorLogging { /** * Map of execution graphs belonging to recently started jobs with the time stamp of the last - * received job event. + * received job event. The insert order is preserved through a LinkedHashMap. */ - val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]() - val lru = collection.mutable.Queue[JobID]() + val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]() override def receiveWithLogMessages: Receive = { + /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => { - graphs.update(jobID, graph) + // wrap graph inside a soft reference + graphs.update(jobID, new SoftReference(graph)) + + // clear all execution edges of the graph + val iter = graph.getAllExecutionVertices().iterator() + while (iter.hasNext) { + iter.next().clearExecutionEdges() + } --- End diff -- Why not using Scala power: graph.getAllExecutionVertices.asScala.foreach { _.clearExecutionEdges } with import scala.collection.JavaConvertes.iterableAsScalaIterableConverter in scope. > Archived Execution Graph consumes too much memory > ------------------------------------------------- > > Key: FLINK-1442 > URL: https://issues.apache.org/jira/browse/FLINK-1442 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Max Michels > > The JobManager archives the execution graphs, for analysis of jobs. The > graphs may consume a lot of memory. > Especially the execution edges in all2all connection patterns are extremely > many and add up in memory consumption. > The execution edges connect all parallel tasks. So for a all2all pattern > between n and m tasks, there are n*m edges. For parallelism of multiple 100 > tasks, this can easily reach 100k objects and more, each with a set of > metadata. > I propose the following to solve that: > 1. Clear all execution edges from the graph (majority of the memory > consumers) when it is given to the archiver. > 2. Have the map/list of the archived graphs behind a soft reference, to it > will be removed under memory pressure before the JVM crashes. That may remove > graphs from the history early, but is much preferable to the JVM crashing, in > which case the graph is lost as well... > 3. Long term: The graph should be archived somewhere else. Somthing like the > History server used by Hadoop and Hive would be a good idea. -- This message was sent by Atlassian JIRA (v6.3.4#6332)