[ 
https://issues.apache.org/jira/browse/FLINK-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14304846#comment-14304846
 ] 

ASF GitHub Bot commented on FLINK-1442:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/344#discussion_r24072128
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 ---
    @@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID
     import org.apache.flink.runtime.messages.ArchiveMessages._
     import org.apache.flink.runtime.messages.JobManagerMessages._
     
    +import scala.collection.mutable.LinkedHashMap
    +import scala.ref.SoftReference
    +
     class MemoryArchivist(private val max_entries: Int) extends Actor with 
ActorLogMessages with
     ActorLogging {
       /**
        * Map of execution graphs belonging to recently started jobs with the 
time stamp of the last
    -   * received job event.
    +   * received job event. The insert order is preserved through a 
LinkedHashMap.
        */
    -  val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
    -  val lru = collection.mutable.Queue[JobID]()
    +  val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
     
       override def receiveWithLogMessages: Receive = {
    +    /* Receive Execution Graph to archive */
         case ArchiveExecutionGraph(jobID, graph) => {
    -      graphs.update(jobID, graph)
    +      // wrap graph inside a soft reference
    +      graphs.update(jobID, new SoftReference(graph))
    +
    +      // clear all execution edges of the graph
    +      val iter = graph.getAllExecutionVertices().iterator()
    +      while (iter.hasNext) {
    +        iter.next().clearExecutionEdges()
    +      }
    +
           cleanup(jobID)
         }
     
         case RequestArchivedJobs => {
    -      sender ! ArchivedJobs(graphs.values)
    +      sender ! ArchivedJobs(getAllGraphs())
         }
     
         case RequestJob(jobID) => {
    -      graphs.get(jobID) match {
    -        case Some(graph) => sender ! JobFound(jobID, graph)
    -        case None => sender ! JobNotFound(jobID)
    +      getGraph(jobID) match {
    +        case graph: ExecutionGraph => sender ! JobFound(jobID, graph)
    +        case _ => sender ! JobNotFound(jobID)
           }
         }
     
         case RequestJobStatus(jobID) => {
    -      graphs.get(jobID) match {
    -        case Some(eg) => sender ! CurrentJobStatus(jobID, eg.getState)
    -        case None => sender ! JobNotFound(jobID)
    +      getGraph(jobID) match {
    +        case graph: ExecutionGraph => sender ! CurrentJobStatus(jobID, 
graph.getState)
    +        case _ => sender ! JobNotFound(jobID)
           }
         }
       }
     
    -  def cleanup(jobID: JobID): Unit = {
    -    if (!lru.contains(jobID)) {
    -      lru.enqueue(jobID)
    +  /**
    +   * Gets all graphs that have not been garbage collected.
    +   * @return An iterable with all valid ExecutionGraphs
    +   */
    +  def getAllGraphs() = graphs.values.flatMap(ref => ref.get match {
    +    case Some(graph) => Seq(graph)
    +    case _ => Seq()
    +  })
    +
    +  /**
    +   * Gets a graph with a jobID if it has not been garbage collected.
    +   * @param jobID
    +   * @return ExecutionGraph or null
    +   */
    +  def getGraph(jobID: JobID) = graphs.get(jobID) match {
    --- End diff --
    
    Why are we doing that? Why not working on the Option type? I don't like 
null.


> Archived Execution Graph consumes too much memory
> -------------------------------------------------
>
>                 Key: FLINK-1442
>                 URL: https://issues.apache.org/jira/browse/FLINK-1442
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Max Michels
>
> The JobManager archives the execution graphs, for analysis of jobs. The 
> graphs may consume a lot of memory.
> Especially the execution edges in all2all connection patterns are extremely 
> many and add up in memory consumption.
> The execution edges connect all parallel tasks. So for a all2all pattern 
> between n and m tasks, there are n*m edges. For parallelism of multiple 100 
> tasks, this can easily reach 100k objects and more, each with a set of 
> metadata.
> I propose the following to solve that:
> 1.  Clear all execution edges from the graph (majority of the memory 
> consumers) when it is given to the archiver.
> 2. Have the map/list of the archived graphs behind a soft reference, to it 
> will be removed under memory pressure before the JVM crashes. That may remove 
> graphs from the history early, but is much preferable to the JVM crashing, in 
> which case the graph is lost as well...
> 3. Long term: The graph should be archived somewhere else. Somthing like the 
> History server used by Hadoop and Hive would be a good idea.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to