[FLINK-1442] [runtime] Reduce memory consumption of archived execution graph

This closes #344


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d181a86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d181a86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d181a86

Branch: refs/heads/master
Commit: 9d181a86a0870204113271b6e45f611cba04fc7d
Parents: 91f9bfc
Author: Max <[email protected]>
Authored: Mon Jan 26 19:31:47 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Feb 5 12:17:15 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java |  9 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |  4 ++
 .../runtime/jobmanager/MemoryArchivist.scala    | 68 +++++++++++++++-----
 .../testingUtils/TestingMemoryArchivist.scala   |  9 +--
 4 files changed, 68 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a9a5434..8812569 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -69,7 +69,7 @@ public class ExecutionVertex implements Serializable {
        
        private transient final IntermediateResultPartition[] resultPartitions;
        
-       private transient final ExecutionEdge[][] inputEdges;
+       private transient ExecutionEdge[][] inputEdges;
        
        private final int subTaskIndex;
        
@@ -447,6 +447,13 @@ public class ExecutionVertex implements Serializable {
                return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + 
'/' + getTotalNumberOfParallelSubtasks() + ')';
        }
 
+       /*
+        * Clears all Edges of this ExecutionVertex
+        */
+       public void clearExecutionEdges() {
+               inputEdges = null;
+       }
+
        @Override
        public String toString() {
                return getSimpleName();

http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f6dbab3..c780589 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -453,6 +453,10 @@ class JobManager(val configuration: Configuration)
     throw new RuntimeException("Received unknown message " + message)
   }
 
+  /**
+   * Removes the job and sends it to the MemoryArchivist
+   * @param jobID ID of the job to remove and archive
+   */
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.remove(jobID) match {
       case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg)

http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 60aa4b1..5ca8fb3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
+import scala.collection.mutable.LinkedHashMap
+import scala.ref.SoftReference
+
 class MemoryArchivist(private val max_entries: Int) extends Actor with 
ActorLogMessages with
 ActorLogging {
   /**
    * Map of execution graphs belonging to recently started jobs with the time 
stamp of the last
-   * received job event.
+   * received job event. The insert order is preserved through a LinkedHashMap.
    */
-  val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
-  val lru = collection.mutable.Queue[JobID]()
+  val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]()
 
   override def receiveWithLogMessages: Receive = {
+    /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) => {
-      graphs.update(jobID, graph)
+      // wrap graph inside a soft reference
+      graphs.update(jobID, new SoftReference(graph))
+
+      // clear all execution edges of the graph
+      val iter = graph.getAllExecutionVertices().iterator()
+      while (iter.hasNext) {
+        iter.next().clearExecutionEdges()
+      }
+
       cleanup(jobID)
     }
 
     case RequestArchivedJobs => {
-      sender ! ArchivedJobs(graphs.values)
+      sender ! ArchivedJobs(getAllGraphs())
     }
 
     case RequestJob(jobID) => {
-      graphs.get(jobID) match {
-        case Some(graph) => sender ! JobFound(jobID, graph)
-        case None => sender ! JobNotFound(jobID)
+      getGraph(jobID) match {
+        case graph: ExecutionGraph => sender ! JobFound(jobID, graph)
+        case _ => sender ! JobNotFound(jobID)
       }
     }
 
     case RequestJobStatus(jobID) => {
-      graphs.get(jobID) match {
-        case Some(eg) => sender ! CurrentJobStatus(jobID, eg.getState)
-        case None => sender ! JobNotFound(jobID)
+      getGraph(jobID) match {
+        case graph: ExecutionGraph => sender ! CurrentJobStatus(jobID, 
graph.getState)
+        case _ => sender ! JobNotFound(jobID)
       }
     }
   }
 
-  def cleanup(jobID: JobID): Unit = {
-    if (!lru.contains(jobID)) {
-      lru.enqueue(jobID)
+  /**
+   * Gets all graphs that have not been garbage collected.
+   * @return An iterable with all valid ExecutionGraphs
+   */
+  def getAllGraphs() = graphs.values.flatMap(ref => ref.get match {
+    case Some(graph) => Seq(graph)
+    case _ => Seq()
+  })
+
+  /**
+   * Gets a graph with a jobID if it has not been garbage collected.
+   * @param jobID
+   * @return ExecutionGraph or null
+   */
+  def getGraph(jobID: JobID) = graphs.get(jobID) match {
+    case Some(softRef) => softRef.get match {
+      case Some(graph) => graph
+      case None => null
     }
+  }
 
-    while (lru.size > max_entries) {
-      val removedJobID = lru.dequeue()
-      graphs.remove(removedJobID)
+  /**
+   * Remove old ExecutionGraphs belonging to a jobID
+   * * if more than max_entries are in the queue.
+   * @param jobID
+   */
+  private def cleanup(jobID: JobID): Unit = {
+    while (graphs.size > max_entries) {
+      // get first graph inserted
+      val (jobID, value) = graphs.iterator.next()
+      graphs.remove(jobID)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 71d0feb..ca5f7e4 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -31,10 +31,11 @@ trait TestingMemoryArchivist extends ActorLogMessages {
 
   def receiveTestingMessages: Receive = {
     case RequestExecutionGraph(jobID) =>
-      graphs.get(jobID) match {
-        case Some(executionGraph) => sender ! ExecutionGraphFound(jobID, 
executionGraph)
-        case None => sender ! ExecutionGraphNotFound(jobID)
+      val executionGraph = getGraph(jobID)
+      if (executionGraph != null) {
+        sender ! ExecutionGraphFound(jobID, executionGraph)
+      } else {
+        sender ! ExecutionGraphNotFound(jobID)
       }
-
   }
 }

Reply via email to