[FLINK-1442] [runtime] Reduce memory consumption of archived execution graph
This closes #344 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d181a86 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d181a86 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d181a86 Branch: refs/heads/master Commit: 9d181a86a0870204113271b6e45f611cba04fc7d Parents: 91f9bfc Author: Max <[email protected]> Authored: Mon Jan 26 19:31:47 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Feb 5 12:17:15 2015 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionVertex.java | 9 ++- .../flink/runtime/jobmanager/JobManager.scala | 4 ++ .../runtime/jobmanager/MemoryArchivist.scala | 68 +++++++++++++++----- .../testingUtils/TestingMemoryArchivist.scala | 9 +-- 4 files changed, 68 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a9a5434..8812569 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -69,7 +69,7 @@ public class ExecutionVertex implements Serializable { private transient final IntermediateResultPartition[] resultPartitions; - private transient final ExecutionEdge[][] inputEdges; + private transient ExecutionEdge[][] inputEdges; private final int subTaskIndex; @@ -447,6 +447,13 @@ public class ExecutionVertex implements Serializable { return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')'; } + /* + * Clears all Edges of this ExecutionVertex + */ + public void clearExecutionEdges() { + inputEdges = null; + } + @Override public String toString() { return getSimpleName(); http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f6dbab3..c780589 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -453,6 +453,10 @@ class JobManager(val configuration: Configuration) throw new RuntimeException("Received unknown message " + message) } + /** + * Removes the job and sends it to the MemoryArchivist + * @param jobID ID of the job to remove and archive + */ private def removeJob(jobID: JobID): Unit = { currentJobs.remove(jobID) match { case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg) http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 60aa4b1..5ca8fb3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -25,48 +25,82 @@ import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ +import scala.collection.mutable.LinkedHashMap +import scala.ref.SoftReference + class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with ActorLogging { /** * Map of execution graphs belonging to recently started jobs with the time stamp of the last - * received job event. + * received job event. The insert order is preserved through a LinkedHashMap. */ - val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]() - val lru = collection.mutable.Queue[JobID]() + val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]() override def receiveWithLogMessages: Receive = { + /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => { - graphs.update(jobID, graph) + // wrap graph inside a soft reference + graphs.update(jobID, new SoftReference(graph)) + + // clear all execution edges of the graph + val iter = graph.getAllExecutionVertices().iterator() + while (iter.hasNext) { + iter.next().clearExecutionEdges() + } + cleanup(jobID) } case RequestArchivedJobs => { - sender ! ArchivedJobs(graphs.values) + sender ! ArchivedJobs(getAllGraphs()) } case RequestJob(jobID) => { - graphs.get(jobID) match { - case Some(graph) => sender ! JobFound(jobID, graph) - case None => sender ! JobNotFound(jobID) + getGraph(jobID) match { + case graph: ExecutionGraph => sender ! JobFound(jobID, graph) + case _ => sender ! JobNotFound(jobID) } } case RequestJobStatus(jobID) => { - graphs.get(jobID) match { - case Some(eg) => sender ! CurrentJobStatus(jobID, eg.getState) - case None => sender ! JobNotFound(jobID) + getGraph(jobID) match { + case graph: ExecutionGraph => sender ! CurrentJobStatus(jobID, graph.getState) + case _ => sender ! JobNotFound(jobID) } } } - def cleanup(jobID: JobID): Unit = { - if (!lru.contains(jobID)) { - lru.enqueue(jobID) + /** + * Gets all graphs that have not been garbage collected. + * @return An iterable with all valid ExecutionGraphs + */ + def getAllGraphs() = graphs.values.flatMap(ref => ref.get match { + case Some(graph) => Seq(graph) + case _ => Seq() + }) + + /** + * Gets a graph with a jobID if it has not been garbage collected. + * @param jobID + * @return ExecutionGraph or null + */ + def getGraph(jobID: JobID) = graphs.get(jobID) match { + case Some(softRef) => softRef.get match { + case Some(graph) => graph + case None => null } + } - while (lru.size > max_entries) { - val removedJobID = lru.dequeue() - graphs.remove(removedJobID) + /** + * Remove old ExecutionGraphs belonging to a jobID + * * if more than max_entries are in the queue. + * @param jobID + */ + private def cleanup(jobID: JobID): Unit = { + while (graphs.size > max_entries) { + // get first graph inserted + val (jobID, value) = graphs.iterator.next() + graphs.remove(jobID) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9d181a86/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala index 71d0feb..ca5f7e4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -31,10 +31,11 @@ trait TestingMemoryArchivist extends ActorLogMessages { def receiveTestingMessages: Receive = { case RequestExecutionGraph(jobID) => - graphs.get(jobID) match { - case Some(executionGraph) => sender ! ExecutionGraphFound(jobID, executionGraph) - case None => sender ! ExecutionGraphNotFound(jobID) + val executionGraph = getGraph(jobID) + if (executionGraph != null) { + sender ! ExecutionGraphFound(jobID, executionGraph) + } else { + sender ! ExecutionGraphNotFound(jobID) } - } }
