[FLINK-1442] [runtime] Minor code cleanups in MemoryArchivist
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56b7f85b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56b7f85b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56b7f85b Branch: refs/heads/master Commit: 56b7f85b4f6d522765df19a9710a098092ccde56 Parents: 8ae0dc2 Author: Stephan Ewen <[email protected]> Authored: Wed Feb 4 15:07:02 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Feb 5 12:17:15 2015 +0100 ---------------------------------------------------------------------- .../runtime/jobmanager/MemoryArchivist.scala | 29 +++++++++----------- .../testingUtils/TestingMemoryArchivist.scala | 8 +++--- 2 files changed, 17 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/56b7f85b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 28e960d..88dc927 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -28,8 +28,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages._ import scala.collection.mutable.LinkedHashMap import scala.ref.SoftReference -class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with -ActorLogging { +class MemoryArchivist(private val max_entries: Int) extends Actor + with ActorLogMessages with ActorLogging { + /** * Map of execution graphs belonging to recently started jobs with the time stamp of the last * received job event. The insert order is preserved through a LinkedHashMap. @@ -37,6 +38,7 @@ ActorLogging { val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]() override def receiveWithLogMessages: Receive = { + /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => { // wrap graph inside a soft reference @@ -51,15 +53,15 @@ ActorLogging { case RequestJob(jobID) => { getGraph(jobID) match { - case graph: ExecutionGraph => sender ! JobFound(jobID, graph) - case _ => sender ! JobNotFound(jobID) + case Some(graph) => sender ! JobFound(jobID, graph) + case None => sender ! JobNotFound(jobID) } } case RequestJobStatus(jobID) => { getGraph(jobID) match { - case graph: ExecutionGraph => sender ! CurrentJobStatus(jobID, graph.getState) - case _ => sender ! JobNotFound(jobID) + case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState) + case None => sender ! JobNotFound(jobID) } } } @@ -68,23 +70,18 @@ ActorLogging { * Gets all graphs that have not been garbage collected. * @return An iterable with all valid ExecutionGraphs */ - def getAllGraphs() = graphs.values.flatMap(ref => ref.get match { - case Some(graph) => Seq(graph) - case _ => Seq() - }) + protected def getAllGraphs(): Iterable[ExecutionGraph] = graphs.values.flatMap(_.get) /** * Gets a graph with a jobID if it has not been garbage collected. * @param jobID * @return ExecutionGraph or null */ - def getGraph(jobID: JobID) = graphs.get(jobID) match { - case Some(softRef) => softRef.get match { - case Some(graph) => graph - case None => null - } - case None => null + protected def getGraph(jobID: JobID): Option[ExecutionGraph] = graphs.get(jobID) match { + case Some(softRef) => softRef.get + case None => None } + /** * Remove old ExecutionGraphs belonging to a jobID http://git-wip-us.apache.org/repos/asf/flink/blob/56b7f85b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala index ca5f7e4..d3a3526 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -32,10 +32,10 @@ trait TestingMemoryArchivist extends ActorLogMessages { def receiveTestingMessages: Receive = { case RequestExecutionGraph(jobID) => val executionGraph = getGraph(jobID) - if (executionGraph != null) { - sender ! ExecutionGraphFound(jobID, executionGraph) - } else { - sender ! ExecutionGraphNotFound(jobID) + + executionGraph match { + case Some(graph) => sender ! ExecutionGraphFound(jobID, graph) + case None => sender ! ExecutionGraphNotFound(jobID) } } }
