Made some changes according to suggestions from @aarondav
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e2a43b3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e2a43b3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e2a43b3d Branch: refs/heads/master Commit: e2a43b3dcce81fc99098510d09095e1be4bf3e29 Parents: ba55285 Author: Lian, Cheng <[email protected]> Authored: Mon Nov 11 12:21:54 2013 +0800 Committer: Lian, Cheng <[email protected]> Committed: Mon Nov 11 12:21:54 2013 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++---- .../main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2a43b3d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7499570..42bb388 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -112,9 +112,10 @@ class DAGScheduler( private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { override def preStart() { - env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { - if (failed.size > 0) + context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { + if (failed.size > 0) { resubmitFailedStages() + } } } @@ -853,7 +854,7 @@ class DAGScheduler( // If the RDD has narrow dependencies, pick the first partition of the first narrow dep // that has any placement preferences. Ideally we would choose based on transfer sizes, // but this will do for now. - rdd.dependencies.foreach(_ match { + rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocs(n.rdd, inPart) @@ -861,7 +862,7 @@ class DAGScheduler( return locs } case _ => - }) + } Nil } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2a43b3d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2c21134..702aca8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -891,7 +891,7 @@ private[spark] object BlockManager extends Logging { blockManagerMaster: BlockManagerMaster = null) : Map[BlockId, Seq[BlockManagerId]] = { - // env == null and blockManagerMaster != null is used in tests + // blockManagerMaster != null is used in tests assert (env != null || blockManagerMaster != null) val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { env.blockManager.getLocationBlockIds(blockIds)
