fixed yarn build
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c1201f47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c1201f47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c1201f47 Branch: refs/heads/master Commit: c1201f47e0d44e92da42adb23d27f60d9d494642 Parents: 7ad6921 Author: Prashant Sharma <[email protected]> Authored: Mon Dec 9 12:55:19 2013 +0530 Committer: Prashant Sharma <[email protected]> Committed: Mon Dec 9 13:00:50 2013 +0530 ---------------------------------------------------------------------- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 15 ++++++--------- .../spark/deploy/yarn/YarnAllocationHandler.scala | 4 ++-- 2 files changed, 8 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c1201f47/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index b67e068..6903884 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -27,10 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} -import akka.remote.RemoteClientShutdown +import akka.remote._ import akka.actor.Terminated -import akka.remote.RemoteClientDisconnected import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -55,19 +53,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // This actor just working as a monitor to watch on Driver Actor. class MonitorActor(driverUrl: String) extends Actor { - var driver: ActorRef = null + var driver: ActorSelection = null override def preStart() { logInfo("Listen to driver: " + driverUrl) - driver = context.actorFor(driverUrl) + driver = context.actorSelection(driverUrl) driver ! "hello" - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => - logInfo("Driver terminated or disconnected! Shutting down.") + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c1201f47/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index baa030b..a6ce1b6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.conf.Configuration import java.util.{Collections, Set => JSet} import java.lang.{Boolean => JBoolean} -object AllocationType extends Enumeration ("HOST", "RACK", "ANY") { +object AllocationType extends Enumeration { type AllocationType = Value val HOST, RACK, ANY = Value } @@ -370,7 +370,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = - new ArrayBuffer[ResourceRequest](hostContainerRequests.size() + rackContainerRequests.size() + 1) + new ArrayBuffer[ResourceRequest](hostContainerRequests.size + rackContainerRequests.size + 1) containerRequests ++= hostContainerRequests containerRequests ++= rackContainerRequests
