http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
deleted file mode 100644
index 7d453ec..0000000
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.lang.{Boolean => JBoolean}
-import java.io.File
-import java.util.{Collections, Set => JSet}
-import java.util.regex.Matcher
-import java.util.regex.Pattern
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import org.apache.hadoop.yarn.util.RackResolver
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class YarnSparkHadoopUtil extends SparkHadoopUtil {
-
-  override def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
-    dest.addCredentials(source.getCredentials())
-  }
-
-  // Note that all params which start with SPARK are propagated all the way 
through, so if in yarn mode, this MUST be set to true.
-  override def isYarnMode(): Boolean = { true }
-
-  // Return an appropriate (subclass) of Configuration. Creating config can 
initializes some hadoop subsystems
-  // Always create a new config, dont reuse yarnConf.
-  override def newConfiguration(conf: SparkConf): Configuration =
-    new YarnConfiguration(super.newConfiguration(conf))
-
-  // add any user credentials to the job conf which are necessary for running 
on a secure Hadoop cluster
-  override def addCredentials(conf: JobConf) {
-    val jobCreds = conf.getCredentials()
-    jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
-  }
-
-  override def getCurrentUserCredentials(): Credentials = {
-    UserGroupInformation.getCurrentUser().getCredentials()
-  }
-
-  override def addCurrentUserCredentials(creds: Credentials) {
-    UserGroupInformation.getCurrentUser().addCredentials(creds)
-  }
-
-  override def addSecretKeyToUserCredentials(key: String, secret: String) {
-    val creds = new Credentials()
-    creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))
-    addCurrentUserCredentials(creds)
-  }
-
-  override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
-    val credentials = getCurrentUserCredentials()
-    if (credentials != null) credentials.getSecretKey(new Text(key)) else null
-  }
-
-}
-
-object YarnSparkHadoopUtil {
-  // Additional memory overhead 
-  // 7% was arrived at experimentally. In the interest of minimizing memory 
waste while covering
-  // the common cases. Memory overhead tends to grow with container size. 
-
-  val MEMORY_OVERHEAD_FACTOR = 0.07
-  val MEMORY_OVERHEAD_MIN = 384
-
-  val ANY_HOST = "*"
-
-  val DEFAULT_NUMBER_EXECUTORS = 2
-
-  // All RM requests are issued with same priority : we do not (yet) have any 
distinction between
-  // request types (like map/reduce in hadoop for example)
-  val RM_REQUEST_PRIORITY = 1
-
-  // Host to rack map - saved from allocation requests. We are expecting this 
not to change.
-  // Note that it is possible for this to change : and ResourceManager will 
indicate that to us via
-  // update response to allocate. But we are punting on handling that for now.
-  private val hostToRack = new ConcurrentHashMap[String, String]()
-  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-  /**
-   * Add a path variable to the given environment map.
-   * If the map already contains this key, append the value to the existing 
value instead.
-   */
-  def addPathToEnvironment(env: HashMap[String, String], key: String, value: 
String): Unit = {
-    val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + 
value } else value
-    env.put(key, newValue)
-  }
-
-  /**
-   * Set zero or more environment variables specified by the given input 
string.
-   * The input string is expected to take the form 
"KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
-   */
-  def setEnvFromInputString(env: HashMap[String, String], inputString: 
String): Unit = {
-    if (inputString != null && inputString.length() > 0) {
-      val childEnvs = inputString.split(",")
-      val p = Pattern.compile(environmentVariableRegex)
-      for (cEnv <- childEnvs) {
-        val parts = cEnv.split("=") // split on '='
-        val m = p.matcher(parts(1))
-        val sb = new StringBuffer
-        while (m.find()) {
-          val variable = m.group(1)
-          var replace = ""
-          if (env.get(variable) != None) {
-            replace = env.get(variable).get
-          } else {
-            // if this key is not configured for the child .. get it from the 
env
-            replace = System.getenv(variable)
-            if (replace == null) {
-            // the env key is note present anywhere .. simply set it
-              replace = ""
-            }
-          }
-          m.appendReplacement(sb, Matcher.quoteReplacement(replace))
-        }
-        m.appendTail(sb)
-        // This treats the environment variable as path variable delimited by 
`File.pathSeparator`
-        // This is kept for backward compatibility and consistency with 
Hadoop's behavior
-        addPathToEnvironment(env, parts(0), sb.toString)
-      }
-    }
-  }
-
-  private val environmentVariableRegex: String = {
-    if (Utils.isWindows) {
-      "%([A-Za-z_][A-Za-z0-9_]*?)%"
-    } else {
-      "\\$([A-Za-z_][A-Za-z0-9_]*)"
-    }
-  }
-
-  /**
-   * Escapes a string for inclusion in a command line executed by Yarn. Yarn 
executes commands
-   * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't 
really work. The
-   * argument is enclosed in single quotes and some key characters are escaped.
-   *
-   * @param arg A single argument.
-   * @return Argument quoted for execution via Yarn's generated shell script.
-   */
-  def escapeForShell(arg: String): String = {
-    if (arg != null) {
-      val escaped = new StringBuilder("'")
-      for (i <- 0 to arg.length() - 1) {
-        arg.charAt(i) match {
-          case '$' => escaped.append("\\$")
-          case '"' => escaped.append("\\\"")
-          case '\'' => escaped.append("'\\''")
-          case c => escaped.append(c)
-        }
-      }
-      escaped.append("'").toString()
-    } else {
-      arg
-    }
-  }
-
-  def lookupRack(conf: Configuration, host: String): String = {
-    if (!hostToRack.contains(host)) {
-      populateRackInfo(conf, host)
-    }
-    hostToRack.get(host)
-  }
-
-  def populateRackInfo(conf: Configuration, hostname: String) {
-    Utils.checkHost(hostname)
-
-    if (!hostToRack.containsKey(hostname)) {
-      // If there are repeated failures to resolve, all to an ignore list.
-      val rackInfo = RackResolver.resolve(conf, hostname)
-      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
-        val rack = rackInfo.getNetworkLocation
-        hostToRack.put(hostname, rack)
-        if (! rackToHostSet.containsKey(rack)) {
-          rackToHostSet.putIfAbsent(rack,
-            Collections.newSetFromMap(new ConcurrentHashMap[String, 
JBoolean]()))
-        }
-        rackToHostSet.get(rack).add(hostname)
-
-        // TODO(harvey): Figure out what this comment means...
-        // Since RackResolver caches, we are disabling this for now ...
-      } /* else {
-        // right ? Else we will keep calling rack resolver in case we cant 
resolve rack info ...
-        hostToRack.put(hostname, null)
-      } */
-    }
-  }
-
-  def getApplicationAclsForYarn(securityMgr: SecurityManager)
-      : Map[ApplicationAccessType, String] = {
-    Map[ApplicationAccessType, String] (
-      ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
-      ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
deleted file mode 100644
index 254774a..0000000
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- * This scheduler launches executors through Yarn - by calling into Client to 
launch the Spark AM.
- */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends 
TaskSchedulerImpl(sc) {
-
-  // By default, rack is unknown
-  override def getRackForHost(hostPort: String): Option[String] = {
-    val host = Utils.parseHostPort(hostPort)._1
-    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
deleted file mode 100644
index 2923e67..0000000
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-
-private[spark] class YarnClientSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext)
-  extends YarnSchedulerBackend(scheduler, sc)
-  with Logging {
-
-  private var client: Client = null
-  private var appId: ApplicationId = null
-  @volatile private var stopping: Boolean = false
-
-  /**
-   * Create a Yarn client to submit an application to the ResourceManager.
-   * This waits until the application is running.
-   */
-  override def start() {
-    super.start()
-    val driverHost = conf.get("spark.driver.host")
-    val driverPort = conf.get("spark.driver.port")
-    val hostport = driverHost + ":" + driverPort
-    sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", 
ui.appUIAddress) }
-
-    val argsArrayBuf = new ArrayBuffer[String]()
-    argsArrayBuf += ("--arg", hostport)
-    argsArrayBuf ++= getExtraClientArguments
-
-    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
-    val args = new ClientArguments(argsArrayBuf.toArray, conf)
-    totalExpectedExecutors = args.numExecutors
-    client = new Client(args, conf)
-    appId = client.submitApplication()
-    waitForApplication()
-    asyncMonitorApplication()
-  }
-
-  /**
-   * Return any extra command line arguments to be passed to Client provided 
in the form of
-   * environment variables or Spark properties.
-   */
-  private def getExtraClientArguments: Seq[String] = {
-    val extraArgs = new ArrayBuffer[String]
-    val optionTuples = // List of (target Client argument, environment 
variable, Spark property)
-      List(
-        ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
-        ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
-        ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.executor.instances"),
-        ("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
-        ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
-        ("--executor-memory", "SPARK_EXECUTOR_MEMORY", 
"spark.executor.memory"),
-        ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
-        ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
-        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
-        ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
-      )
-    optionTuples.foreach { case (optionName, envVar, sparkProp) =>
-      if (System.getenv(envVar) != null) {
-        extraArgs += (optionName, System.getenv(envVar))
-      } else if (sc.getConf.contains(sparkProp)) {
-        extraArgs += (optionName, sc.getConf.get(sparkProp))
-      }
-    }
-    extraArgs
-  }
-
-  /**
-   * Report the state of the application until it is running.
-   * If the application has finished, failed or been killed in the process, 
throw an exception.
-   * This assumes both `client` and `appId` have already been set.
-   */
-  private def waitForApplication(): Unit = {
-    assert(client != null && appId != null, "Application has not been 
submitted yet!")
-    val (state, _) = client.monitorApplication(appId, returnOnRunning = true) 
// blocking
-    if (state == YarnApplicationState.FINISHED ||
-      state == YarnApplicationState.FAILED ||
-      state == YarnApplicationState.KILLED) {
-      throw new SparkException("Yarn application has already ended! " +
-        "It might have been killed or unable to launch application master.")
-    }
-    if (state == YarnApplicationState.RUNNING) {
-      logInfo(s"Application $appId has started running.")
-    }
-  }
-
-  /**
-   * Monitor the application state in a separate thread.
-   * If the application has exited for any reason, stop the SparkContext.
-   * This assumes both `client` and `appId` have already been set.
-   */
-  private def asyncMonitorApplication(): Unit = {
-    assert(client != null && appId != null, "Application has not been 
submitted yet!")
-    val t = new Thread {
-      override def run() {
-        while (!stopping) {
-          val report = client.getApplicationReport(appId)
-          val state = report.getYarnApplicationState()
-          if (state == YarnApplicationState.FINISHED ||
-            state == YarnApplicationState.KILLED ||
-            state == YarnApplicationState.FAILED) {
-            logError(s"Yarn application has already exited with state $state!")
-            sc.stop()
-            stopping = true
-          }
-          Thread.sleep(1000L)
-        }
-        Thread.currentThread().interrupt()
-      }
-    }
-    t.setName("Yarn application state monitor")
-    t.setDaemon(true)
-    t.start()
-  }
-
-  /**
-   * Stop the scheduler. This assumes `start()` has already been called.
-   */
-  override def stop() {
-    assert(client != null, "Attempted to stop this scheduler before starting 
it!")
-    stopping = true
-    super.stop()
-    client.stop()
-    logInfo("Stopped")
-  }
-
-  override def applicationId(): String = {
-    Option(appId).map(_.toString).getOrElse {
-      logWarning("Application ID is not initialized yet.")
-      super.applicationId
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index 4157ff9..0000000
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- * This is a simple extension to ClusterScheduler - to ensure that appropriate 
initialization of
- * ApplicationMaster, etc is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext) extends 
TaskSchedulerImpl(sc) {
-
-  logInfo("Created YarnClusterScheduler")
-
-  // Nothing else for now ... initialize application master : which needs a 
SparkContext to
-  // determine how to allocate.
-  // Note that only the first creation of a SparkContext influences (and 
ideally, there must be
-  // only one SparkContext, right ?). Subsequent creations are ignored since 
executors are already
-  // allocated by then.
-
-  // By default, rack is unknown
-  override def getRackForHost(hostPort: String): Option[String] = {
-    val host = Utils.parseHostPort(hostPort)._1
-    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
-  }
-
-  override def postStartHook() {
-    ApplicationMaster.sparkContextInitialized(sc)
-    super.postStartHook()
-    logInfo("YarnClusterScheduler.postStartHook done")
-  }
-
-  override def stop() {
-    super.stop()
-    ApplicationMaster.sparkContextStopped(sc)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
deleted file mode 100644
index b1de81e..0000000
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.IntParam
-
-private[spark] class YarnClusterSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext)
-  extends YarnSchedulerBackend(scheduler, sc) {
-
-  override def start() {
-    super.start()
-    totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
-    if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
-      totalExpectedExecutors = 
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
-        .getOrElse(totalExpectedExecutors)
-    }
-    // System property can override environment variable.
-    totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", 
totalExpectedExecutors)
-  }
-
-  override def applicationId(): String =
-    // In YARN Cluster mode, spark.yarn.app.id is expect to be set
-    // before user application is launched.
-    // So, if spark.yarn.app.id is not set, it is something wrong.
-    sc.getConf.getOption("spark.yarn.app.id").getOrElse {
-      logError("Application ID is not set.")
-      super.applicationId
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
deleted file mode 100644
index 17b79ae..0000000
--- 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.File
-import java.net.URI
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-
-
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap => MutableHashMap }
-import scala.reflect.ClassTag
-import scala.util.Try
-
-import org.apache.spark.{SparkException, SparkConf}
-import org.apache.spark.util.Utils
-
-class ClientBaseSuite extends FunSuite with Matchers {
-
-  test("default Yarn application classpath") {
-    ClientBase.getDefaultYarnApplicationClasspath should 
be(Some(Fixtures.knownDefYarnAppCP))
-  }
-
-  test("default MR application classpath") {
-    ClientBase.getDefaultMRApplicationClasspath should 
be(Some(Fixtures.knownDefMRAppCP))
-  }
-
-  test("resultant classpath for an application that defines a classpath for 
YARN") {
-    withAppConf(Fixtures.mapYARNAppConf) { conf =>
-      val env = newEnv
-      ClientBase.populateHadoopClasspath(conf, env)
-      classpath(env) should be(
-        flatten(Fixtures.knownYARNAppCP, 
ClientBase.getDefaultMRApplicationClasspath))
-    }
-  }
-
-  test("resultant classpath for an application that defines a classpath for 
MR") {
-    withAppConf(Fixtures.mapMRAppConf) { conf =>
-      val env = newEnv
-      ClientBase.populateHadoopClasspath(conf, env)
-      classpath(env) should be(
-        flatten(ClientBase.getDefaultYarnApplicationClasspath, 
Fixtures.knownMRAppCP))
-    }
-  }
-
-  test("resultant classpath for an application that defines both classpaths, 
YARN and MR") {
-    withAppConf(Fixtures.mapAppConf) { conf =>
-      val env = newEnv
-      ClientBase.populateHadoopClasspath(conf, env)
-      classpath(env) should be(flatten(Fixtures.knownYARNAppCP, 
Fixtures.knownMRAppCP))
-    }
-  }
-
-  private val SPARK = "local:/sparkJar"
-  private val USER = "local:/userJar"
-  private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
-
-  test("Local jar URIs") {
-    val conf = new Configuration()
-    val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
-    val env = new MutableHashMap[String, String]()
-    val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
-
-    ClientBase.populateClasspath(args, conf, sparkConf, env)
-
-    val cp = env("CLASSPATH").split(File.pathSeparator)
-    s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
-      val uri = new URI(entry)
-      if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
-        cp should contain (uri.getPath())
-      } else {
-        cp should not contain (uri.getPath())
-      }
-    })
-    cp should contain (Environment.PWD.$())
-    cp should contain (s"${Environment.PWD.$()}${File.separator}*")
-    cp should not contain (ClientBase.SPARK_JAR)
-    cp should not contain (ClientBase.APP_JAR)
-  }
-
-  test("Jar path propagation through SparkConf") {
-    val conf = new Configuration()
-    val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
-    val yarnConf = new YarnConfiguration()
-    val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
-
-    val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
-    doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
-      any(classOf[Path]), anyShort(), anyBoolean())
-
-    val tempDir = Utils.createTempDir()
-    try {
-      client.prepareLocalResources(tempDir.getAbsolutePath())
-      sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be 
(Some(USER))
-
-      // The non-local path should be propagated by name only, since it will 
end up in the app's
-      // staging dir.
-      val expected = ADDED.split(",")
-        .map(p => {
-          val uri = new URI(p)
-          if (ClientBase.LOCAL_SCHEME == uri.getScheme()) {
-            p
-          } else {
-            Option(uri.getFragment()).getOrElse(new File(p).getName())
-          }
-        })
-        .mkString(",")
-
-      sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be 
(Some(expected))
-    } finally {
-      Utils.deleteRecursively(tempDir)
-    }
-  }
-
-  test("check access nns empty") {
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.yarn.access.namenodes", "")
-    val nns = ClientBase.getNameNodesToAccess(sparkConf)
-    nns should be(Set())
-  }
-
-  test("check access nns unset") {
-    val sparkConf = new SparkConf()
-    val nns = ClientBase.getNameNodesToAccess(sparkConf)
-    nns should be(Set())
-  }
-
-  test("check access nns") {
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
-    val nns = ClientBase.getNameNodesToAccess(sparkConf)
-    nns should be(Set(new Path("hdfs://nn1:8032")))
-  }
-
-  test("check access nns space") {
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
-    val nns = ClientBase.getNameNodesToAccess(sparkConf)
-    nns should be(Set(new Path("hdfs://nn1:8032")))
-  }
-
-  test("check access two nns") {
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.yarn.access.namenodes", 
"hdfs://nn1:8032,hdfs://nn2:8032")
-    val nns = ClientBase.getNameNodesToAccess(sparkConf)
-    nns should be(Set(new Path("hdfs://nn1:8032"), new 
Path("hdfs://nn2:8032")))
-  }
-
-  test("check token renewer") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
-    hadoopConf.set("yarn.resourcemanager.principal", 
"yarn/myrm:[email protected]")
-    val renewer = ClientBase.getTokenRenewer(hadoopConf)
-    renewer should be ("yarn/myrm:[email protected]")
-  }
-
-  test("check token renewer default") {
-    val hadoopConf = new Configuration()
-    val caught =
-      intercept[SparkException] {
-        ClientBase.getTokenRenewer(hadoopConf)
-      }
-    assert(caught.getMessage === "Can't get Master Kerberos principal for use 
as renewer")
-  }
-
-  object Fixtures {
-
-    val knownDefYarnAppCP: Seq[String] =
-      getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
-                                                
"DEFAULT_YARN_APPLICATION_CLASSPATH",
-                                                Seq[String]())(a => a.toSeq)
-
-
-    val knownDefMRAppCP: Seq[String] =
-      getFieldValue2[String, Array[String], Seq[String]](
-        classOf[MRJobConfig],
-        "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
-        Seq[String]())(a => a.split(","))(a => a.toSeq)
-
-    val knownYARNAppCP = Some(Seq("/known/yarn/path"))
-
-    val knownMRAppCP = Some(Seq("/known/mr/path"))
-
-    val mapMRAppConf =
-      Map("mapreduce.application.classpath" -> 
knownMRAppCP.map(_.mkString(":")).get)
-
-    val mapYARNAppConf =
-      Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> 
knownYARNAppCP.map(_.mkString(":")).get)
-
-    val mapAppConf = mapYARNAppConf ++ mapMRAppConf
-  }
-
-  def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => 
Any) {
-    val conf = new Configuration
-    m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") }
-    testCode(conf)
-  }
-
-  def newEnv = MutableHashMap[String, String]()
-
-  def classpath(env: MutableHashMap[String, String]) = 
env(Environment.CLASSPATH.name).split(":|;")
-
-  def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ 
b).flatten.toArray
-
-  def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => 
B)(mapTo: A => B): B =
-    
Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults)
-
-  def getFieldValue2[A: ClassTag, A1: ClassTag, B](
-        clazz: Class[_],
-        field: String,
-        defaults: => B)(mapTo:  A => B)(mapTo1: A1 => B) : B = {
-    Try(clazz.getField(field)).map(_.get(null)).map {
-      case v: A => mapTo(v)
-      case v1: A1 => mapTo1(v1)
-      case _ => defaults
-    }.toOption.getOrElse(defaults)
-  }
-
-  private class DummyClient(
-      val args: ClientArguments,
-      val hadoopConf: Configuration,
-      val sparkConf: SparkConf,
-      val yarnConf: YarnConfiguration) extends ClientBase {
-    override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit 
= ???
-    override def submitApplication(): ApplicationId = ???
-    override def getApplicationReport(appId: ApplicationId): ApplicationReport 
= ???
-    override def getClientToken(report: ApplicationReport): String = ???
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
deleted file mode 100644
index 80b57d1..0000000
--- 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito.when
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-
-class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
-
-  class MockClientDistributedCacheManager extends 
ClientDistributedCacheManager {
-    override def getVisibility(conf: Configuration, uri: URI, statCache: 
Map[URI, FileStatus]): 
-        LocalResourceVisibility = {
-      LocalResourceVisibility.PRIVATE
-    }
-  }
-  
-  test("test getFileStatus empty") {
-    val distMgr = new ClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val uri = new URI("/tmp/testing")
-    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val stat = distMgr.getFileStatus(fs, uri, statCache)
-    assert(stat.getPath() === null)
-  }
-
-  test("test getFileStatus cached") {
-    val distMgr = new ClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val uri = new URI("/tmp/testing")
-    val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, 
"testOwner", 
-      null, new Path("/tmp/testing"))
-    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> 
realFileStatus)
-    val stat = distMgr.getFileStatus(fs, uri, statCache)
-    assert(stat.getPath().toString() === "/tmp/testing")
-  }
-
-  test("test addResource") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
-    distMgr.addResource(fs, conf, destPath, localResources, 
LocalResourceType.FILE, "link", 
-      statCache, false)
-    val resource = localResources("link")
-    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === 
destPath)
-    assert(resource.getTimestamp() === 0)
-    assert(resource.getSize() === 0)
-    assert(resource.getType() === LocalResourceType.FILE)
-
-    val env = new HashMap[String, String]()
-    distMgr.setDistFilesEnv(env)
-    assert(env("SPARK_YARN_CACHE_FILES") === 
"file:/foo.invalid.com:8080/tmp/testing#link")
-    assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
-    assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
-    assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === 
LocalResourceVisibility.PRIVATE.name())
-
-    distMgr.setDistArchivesEnv(env)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
-
-    // add another one and verify both there and order correct
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, 
"testOwner", 
-      null, new Path("/tmp/testing2"))
-    val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
-    when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
-    distMgr.addResource(fs, conf, destPath2, localResources, 
LocalResourceType.FILE, "link2", 
-      statCache, false)
-    val resource2 = localResources("link2")
-    assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === 
destPath2)
-    assert(resource2.getTimestamp() === 10)
-    assert(resource2.getSize() === 20)
-    assert(resource2.getType() === LocalResourceType.FILE)
-
-    val env2 = new HashMap[String, String]()
-    distMgr.setDistFilesEnv(env2)
-    val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
-    val files = env2("SPARK_YARN_CACHE_FILES").split(',') 
-    val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
-    val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
-    assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
-    assert(timestamps(0)  === "0")
-    assert(sizes(0)  === "0")
-    assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
-
-    assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
-    assert(timestamps(1)  === "10")
-    assert(sizes(1)  === "20")
-    assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
-  }
-
-  test("test addResource link null") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
-    intercept[Exception] {
-      distMgr.addResource(fs, conf, destPath, localResources, 
LocalResourceType.FILE, null, 
-        statCache, false)
-    }
-    assert(localResources.get("link") === None)
-    assert(localResources.size === 0)
-  }
-
-  test("test addResource appmaster only") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, 
"testOwner", 
-      null, new Path("/tmp/testing"))
-    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
-    distMgr.addResource(fs, conf, destPath, localResources, 
LocalResourceType.ARCHIVE, "link", 
-      statCache, true)
-    val resource = localResources("link")
-    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === 
destPath)
-    assert(resource.getTimestamp() === 10)
-    assert(resource.getSize() === 20)
-    assert(resource.getType() === LocalResourceType.ARCHIVE)
-
-    val env = new HashMap[String, String]()
-    distMgr.setDistFilesEnv(env)
-    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
-    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
-    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
-    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-
-    distMgr.setDistArchivesEnv(env)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
-    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
-  }
-
-  test("test addResource archive") {
-    val distMgr = new MockClientDistributedCacheManager()
-    val fs = mock[FileSystem]
-    val conf = new Configuration()
-    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
-    val localResources = HashMap[String, LocalResource]()
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
-    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, 
"testOwner", 
-      null, new Path("/tmp/testing"))
-    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
-    distMgr.addResource(fs, conf, destPath, localResources, 
LocalResourceType.ARCHIVE, "link", 
-      statCache, false)
-    val resource = localResources("link")
-    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
-    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === 
destPath)
-    assert(resource.getTimestamp() === 10)
-    assert(resource.getSize() === 20)
-    assert(resource.getType() === LocalResourceType.ARCHIVE)
-
-    val env = new HashMap[String, String]()
-
-    distMgr.setDistArchivesEnv(env)
-    assert(env("SPARK_YARN_CACHE_ARCHIVES") === 
"file:/foo.invalid.com:8080/tmp/testing#link")
-    assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
-    assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
-    assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === 
LocalResourceVisibility.PRIVATE.name())
-
-    distMgr.setDistFilesEnv(env)
-    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
-    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
-    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
-    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
deleted file mode 100644
index 8d184a0..0000000
--- 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.deploy.yarn.YarnAllocator._
-import org.scalatest.FunSuite
-
-class YarnAllocatorSuite extends FunSuite {
-  test("memory exceeded diagnostic regexes") {
-    val diagnostics =
-      "Container 
[pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
-      "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical 
memory used; " +
-      "5.8 GB of 4.2 GB virtual memory used. Killing container."
-    val vmemMsg = memLimitExceededLogMessage(diagnostics, 
VMEM_EXCEEDED_PATTERN)
-    val pmemMsg = memLimitExceededLogMessage(diagnostics, 
PMEM_EXCEEDED_PATTERN)
-    assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
-    assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
deleted file mode 100644
index 2cc5abb..0000000
--- 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.{File, IOException}
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.scalatest.{FunSuite, Matchers}
-
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-
-class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
-
-  val hasBash =
-    try {
-      val exitCode = Runtime.getRuntime().exec(Array("bash", 
"--version")).waitFor()
-      exitCode == 0
-    } catch {
-      case e: IOException =>
-        false
-    }
-
-  if (!hasBash) {
-    logWarning("Cannot execute bash, skipping bash tests.")
-  }
-
-  def bashTest(name: String)(fn: => Unit) =
-    if (hasBash) test(name)(fn) else ignore(name)(fn)
-
-  bashTest("shell script escaping") {
-    val scriptFile = File.createTempFile("script.", ".sh")
-    val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", 
"\\arg6")
-    try {
-      val argLine = args.map(a => 
YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ")
-      Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile)
-      scriptFile.setExecutable(true)
-
-      val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))
-      val out = new 
String(ByteStreams.toByteArray(proc.getInputStream())).trim()
-      val err = new String(ByteStreams.toByteArray(proc.getErrorStream()))
-      val exitCode = proc.waitFor()
-      exitCode should be (0)
-      out should be (args.mkString(" "))
-    } finally {
-      scriptFile.delete()
-    }
-  }
-
-  test("Yarn configuration override") {
-    val key = "yarn.nodemanager.hostname"
-    val default = new YarnConfiguration()
-
-    val sparkConf = new SparkConf()
-      .set("spark.hadoop." + key, "someHostName")
-    val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
-
-    yarnConf.getClass() should be (classOf[YarnConfiguration])
-    yarnConf.get(key) should not be default.get(key)
-  }
-
-
-  test("test getApplicationAclsForYarn acls on") {
-
-    // spark acls on, just pick up default user
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.acls.enable", "true")
-
-    val securityMgr = new SecurityManager(sparkConf)
-    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
-
-    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
-    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
-
-    viewAcls match {
-      case Some(vacls) => {
-        val aclSet = vacls.split(',').map(_.trim).toSet
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
-        fail()
-      }
-    }
-    modifyAcls match {
-      case Some(macls) => {
-        val aclSet = macls.split(',').map(_.trim).toSet
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
-        fail()
-      }
-    }
-  }
-
-  test("test getApplicationAclsForYarn acls on and specify users") {
-
-    // default spark acls are on and specify acls
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.acls.enable", "true")
-    sparkConf.set("spark.ui.view.acls", "user1,user2")
-    sparkConf.set("spark.modify.acls", "user3,user4")
-
-    val securityMgr = new SecurityManager(sparkConf)
-    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
-
-    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
-    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
-
-    viewAcls match {
-      case Some(vacls) => {
-        val aclSet = vacls.split(',').map(_.trim).toSet
-        assert(aclSet.contains("user1"))
-        assert(aclSet.contains("user2"))
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
-        fail()
-      }
-    }
-    modifyAcls match {
-      case Some(macls) => {
-        val aclSet = macls.split(',').map(_.trim).toSet
-        assert(aclSet.contains("user3"))
-        assert(aclSet.contains("user4"))
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      }
-      case None => {
-        fail()
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index bba7364..d7579bf 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -25,9 +25,9 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>yarn-parent_2.10</artifactId>
-  <packaging>pom</packaging>
-  <name>Spark Project YARN Parent POM</name>
+  <artifactId>spark-yarn_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project YARN</name>
   <properties>
     <sbt.project.name>yarn</sbt.project.name>
   </properties>
@@ -59,6 +59,12 @@
       <artifactId>hadoop-client</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>
@@ -70,41 +76,54 @@
     </dependency>
   </dependencies>
 
+  <!--
+    See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some 
needed
+    dependencies, so they need to be added manually for the tests to work.
+  -->
   <profiles>
     <profile>
-      <id>yarn-alpha</id>
-      <build>
-        <plugins>
-          <plugin>
-            <artifactId>maven-antrun-plugin</artifactId>
-            <executions>
-              <execution>
-                <phase>validate</phase>
-                <goals>
-                  <goal>run</goal>
-                </goals>
-                <configuration>
-                  <tasks>
-                    
<echo>*******************************************************************************************</echo>
-                    <echo>***WARNING***: Support for YARN-alpha API's will be 
removed in Spark 1.3 (see SPARK-3445).*</echo>
-                    
<echo>*******************************************************************************************</echo>
-                  </tasks>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-      <modules>
-        <module>alpha</module>
-      </modules>
-    </profile>
-
-    <profile>
-      <id>yarn</id>
-      <modules>
-        <module>stable</module>
-      </modules>
+      <id>hadoop-2.2</id>
+      <properties>
+        <jersey.version>1.9</jersey.version>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+          <version>6.1.26</version>
+          <exclusions>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>servlet-api</artifactId>
+            </exclusion>
+          </exclusions>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+          <version>${jersey.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+          <version>${jersey.version}</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>stax</groupId>
+              <artifactId>stax-api</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+          <version>${jersey.version}</version>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
     </profile>
   </profiles>
 
@@ -125,38 +144,6 @@
         </configuration>
       </plugin>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <executions>
-          <execution>
-           <id>add-scala-sources</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>src/main/scala</source>
-                <source>../common/src/main/scala</source>
-              </sources>
-            </configuration>
-          </execution>
-          <execution>
-            <id>add-scala-test-sources</id>
-            <phase>generate-test-sources</phase>
-            <goals>
-              <goal>add-test-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>src/test/scala</source>
-                <source>../common/src/test/scala</source>
-              </sources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
         <groupId>org.scalatest</groupId>
         <artifactId>scalatest-maven-plugin</artifactId>
         <configuration>
@@ -169,12 +156,6 @@
 
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
     
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-
-    <resources>
-      <resource>
-        <directory>../common/src/main/resources</directory>
-      </resource>
-    </resources>
   </build>
 
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..987b337
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.util.control.NonFatal
+
+import java.io.IOException
+import java.lang.reflect.InvocationTargetException
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicReference
+
+import akka.actor._
+import akka.remote._
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, 
SparkEnv}
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.HistoryServer
+import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+
+/**
+ * Common application master functionality for Spark on Yarn.
+ */
+private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
+  client: YarnRMClient) extends Logging {
+  // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
+  // optimal as more containers are available. Might need to handle this 
better.
+
+  private val sparkConf = new SparkConf()
+  private val yarnConf: YarnConfiguration = 
SparkHadoopUtil.get.newConfiguration(sparkConf)
+    .asInstanceOf[YarnConfiguration]
+  private val isDriver = args.userClass != null
+
+  // Default to numExecutors * 2, with minimum of 3
+  private val maxNumExecutorFailures = 
sparkConf.getInt("spark.yarn.max.executor.failures",
+    sparkConf.getInt("spark.yarn.max.worker.failures", 
math.max(args.numExecutors * 2, 3)))
+
+  @volatile private var exitCode = 0
+  @volatile private var unregistered = false
+  @volatile private var finished = false
+  @volatile private var finalStatus = FinalApplicationStatus.SUCCEEDED
+  @volatile private var finalMsg: String = ""
+  @volatile private var userClassThread: Thread = _
+
+  private var reporterThread: Thread = _
+  private var allocator: YarnAllocator = _
+
+  // Fields used in client mode.
+  private var actorSystem: ActorSystem = null
+  private var actor: ActorRef = _
+
+  // Fields used in cluster mode.
+  private val sparkContextRef = new AtomicReference[SparkContext](null)
+
+  final def run(): Int = {
+    try {
+      val appAttemptId = client.getAttemptId()
+
+      if (isDriver) {
+        // Set the web ui port to be ephemeral for yarn so we don't conflict 
with
+        // other spark processes running on the same box
+        System.setProperty("spark.ui.port", "0")
+
+        // Set the master property to match the requested mode.
+        System.setProperty("spark.master", "yarn-cluster")
+
+        // Propagate the application ID so that YarnClusterSchedulerBackend 
can pick it up.
+        System.setProperty("spark.yarn.app.id", 
appAttemptId.getApplicationId().toString())
+      }
+
+      logInfo("ApplicationAttemptId: " + appAttemptId)
+
+      val fs = FileSystem.get(yarnConf)
+      val cleanupHook = new Runnable {
+        override def run() {
+          // If the SparkContext is still registered, shut it down as a best 
case effort in case
+          // users do not call sc.stop or do System.exit().
+          val sc = sparkContextRef.get()
+          if (sc != null) {
+            logInfo("Invoking sc stop from shutdown hook")
+            sc.stop()
+          }
+          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
+          val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
+
+          if (!finished) {
+            // This happens when the user application calls System.exit(). We 
have the choice
+            // of either failing or succeeding at this point. We report 
success to avoid
+            // retrying applications that have succeeded (System.exit(0)), 
which means that
+            // applications that explicitly exit with a non-zero status will 
also show up as
+            // succeeded in the RM UI.
+            finish(finalStatus,
+              ApplicationMaster.EXIT_SUCCESS,
+              "Shutdown hook called before final status was reported.")
+          }
+
+          if (!unregistered) {
+            // we only want to unregister if we don't want the RM to retry
+            if (finalStatus == FinalApplicationStatus.SUCCEEDED || 
isLastAttempt) {
+              unregister(finalStatus, finalMsg)
+              cleanupStagingDir(fs)
+            }
+          }
+        }
+      }
+
+      // Use higher priority than FileSystem.
+      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > 
FileSystem.SHUTDOWN_HOOK_PRIORITY)
+      ShutdownHookManager
+        .get().addShutdownHook(cleanupHook, 
ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
+
+      // Call this to force generation of secret so it gets populated into the
+      // Hadoop UGI. This has to happen before the startUserClass which does a
+      // doAs in order for the credentials to be passed on to the executor 
containers.
+      val securityMgr = new SecurityManager(sparkConf)
+
+      if (isDriver) {
+        runDriver(securityMgr)
+      } else {
+        runExecutorLauncher(securityMgr)
+      }
+    } catch {
+      case e: Exception =>
+        // catch everything else if not specifically handled
+        logError("Uncaught exception: ", e)
+        finish(FinalApplicationStatus.FAILED,
+          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+          "Uncaught exception: " + e.getMessage())
+    }
+    exitCode
+  }
+
+  /**
+   * unregister is used to completely unregister the application from the 
ResourceManager.
+   * This means the ResourceManager will not retry the application attempt on 
your behalf if
+   * a failure occurred.
+   */
+  final def unregister(status: FinalApplicationStatus, diagnostics: String = 
null) = synchronized {
+    if (!unregistered) {
+      logInfo(s"Unregistering ApplicationMaster with $status" +
+        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+      unregistered = true
+      client.unregister(status, Option(diagnostics).getOrElse(""))
+    }
+  }
+
+  final def finish(status: FinalApplicationStatus, code: Int, msg: String = 
null) = synchronized {
+    if (!finished) {
+      val inShutdown = Utils.inShutdown()
+      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
+        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+      exitCode = code
+      finalStatus = status
+      finalMsg = msg
+      finished = true
+      if (!inShutdown && Thread.currentThread() != reporterThread && 
reporterThread != null) {
+        logDebug("shutting down reporter thread")
+        reporterThread.interrupt()
+      }
+      if (!inShutdown && Thread.currentThread() != userClassThread && 
userClassThread != null) {
+        logDebug("shutting down user thread")
+        userClassThread.interrupt()
+      }
+    }
+  }
+
+  private def sparkContextInitialized(sc: SparkContext) = {
+    sparkContextRef.synchronized {
+      sparkContextRef.compareAndSet(null, sc)
+      sparkContextRef.notifyAll()
+    }
+  }
+
+  private def sparkContextStopped(sc: SparkContext) = {
+    sparkContextRef.compareAndSet(sc, null)
+  }
+
+  private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
+    val sc = sparkContextRef.get()
+
+    val appId = client.getAttemptId().getApplicationId().toString()
+    val historyAddress =
+      sparkConf.getOption("spark.yarn.historyServer.address")
+        .map { address => 
s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
+        .getOrElse("")
+
+    allocator = client.register(yarnConf,
+      if (sc != null) sc.getConf else sparkConf,
+      if (sc != null) sc.preferredNodeLocationData else Map(),
+      uiAddress,
+      historyAddress,
+      securityMgr)
+
+    allocator.allocateResources()
+    reporterThread = launchReporterThread()
+  }
+
+  private def runDriver(securityMgr: SecurityManager): Unit = {
+    addAmIpFilter()
+    userClassThread = startUserClass()
+
+    // This a bit hacky, but we need to wait until the spark.driver.port 
property has
+    // been set by the Thread executing the user class.
+    val sc = waitForSparkContextInitialized()
+
+    // If there is no SparkContext at this point, just fail the app.
+    if (sc == null) {
+      finish(FinalApplicationStatus.FAILED,
+        ApplicationMaster.EXIT_SC_NOT_INITED,
+        "Timed out waiting for SparkContext.")
+    } else {
+      registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
+      userClassThread.join()
+    }
+  }
+
+  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
+    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", 
Utils.localHostName, 0,
+      conf = sparkConf, securityManager = securityMgr)._1
+    actor = waitForSparkDriver()
+    addAmIpFilter()
+    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
+
+    // In client mode the actor will stop the reporter thread.
+    reporterThread.join()
+  }
+
+  private def launchReporterThread(): Thread = {
+    // Ensure that progress is sent before 
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+    val expiryInterval = 
yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+    // we want to be reasonably responsive without causing too many requests 
to RM.
+    val schedulerInterval =
+      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+    // must be <= expiryInterval / 2.
+    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+
+    // The number of failures in a row until Reporter thread give up
+    val reporterMaxFailures = 
sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
+
+    val t = new Thread {
+      override def run() {
+        var failureCount = 0
+        while (!finished) {
+          try {
+            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+              finish(FinalApplicationStatus.FAILED,
+                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+                "Max number of executor failures reached")
+            } else {
+              logDebug("Sending progress")
+              allocator.allocateResources()
+            }
+            failureCount = 0
+          } catch {
+            case i: InterruptedException =>
+            case e: Throwable => {
+              failureCount += 1
+              if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+                finish(FinalApplicationStatus.FAILED,
+                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was 
thrown " +
+                    s"${failureCount} time(s) from Reporter thread.")
+
+              } else {
+                logWarning(s"Reporter thread fails ${failureCount} time(s) in 
a row.", e)
+              }
+            }
+          }
+          try {
+            Thread.sleep(interval)
+          } catch {
+            case e: InterruptedException =>
+          }
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.setName("Reporter")
+    t.start()
+    logInfo("Started progress reporter thread - sleep time : " + interval)
+    t
+  }
+
+  /**
+   * Clean up the staging directory.
+   */
+  private def cleanupStagingDir(fs: FileSystem) {
+    var stagingDirPath: Path = null
+    try {
+      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", 
"false").toBoolean
+      if (!preserveFiles) {
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+        if (stagingDirPath == null) {
+          logError("Staging directory is null")
+          return
+        }
+        logInfo("Deleting staging directory " + stagingDirPath)
+        fs.delete(stagingDirPath, true)
+      }
+    } catch {
+      case ioe: IOException =>
+        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+  }
+
+  private def waitForSparkContextInitialized(): SparkContext = {
+    logInfo("Waiting for spark context initialization")
+    try {
+      sparkContextRef.synchronized {
+        var count = 0
+        val waitTime = 10000L
+        val numTries = 
sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
+        while (sparkContextRef.get() == null && count < numTries && !finished) 
{
+          logInfo("Waiting for spark context initialization ... " + count)
+          count = count + 1
+          sparkContextRef.wait(waitTime)
+        }
+
+        val sparkContext = sparkContextRef.get()
+        if (sparkContext == null) {
+          logError(("SparkContext did not initialize after waiting for %d ms. 
Please check earlier"
+            + " log output for errors. Failing the 
application.").format(numTries * waitTime))
+        }
+        sparkContext
+      }
+    }
+  }
+
+  private def waitForSparkDriver(): ActorRef = {
+    logInfo("Waiting for Spark driver to be reachable.")
+    var driverUp = false
+    var count = 0
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+
+    // spark driver should already be up since it launched us, but we don't 
want to
+    // wait forever, so wait 100 seconds max to match the cluster mode setting.
+    // Leave this config unpublished for now. SPARK-3779 to investigating 
changing
+    // this config to be time based.
+    val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 
1000)
+
+    while (!driverUp && !finished && count < numTries) {
+      try {
+        count = count + 1
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at %s:%s, retrying ...".
+            format(driverHost, driverPort))
+          Thread.sleep(100)
+      }
+    }
+
+    if (!driverUp) {
+      throw new SparkException("Failed to connect to driver!")
+    }
+
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
+
+    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+      SparkEnv.driverActorSystemName,
+      driverHost,
+      driverPort.toString,
+      YarnSchedulerBackend.ACTOR_NAME)
+    actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
+  }
+
+  /** Add the Yarn IP filter that is required for properly securing the UI. */
+  private def addAmIpFilter() = {
+    val proxyBase = 
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    val params = client.getAmIpFilterParams(yarnConf, proxyBase)
+    if (isDriver) {
+      System.setProperty("spark.ui.filters", amFilter)
+      params.foreach { case (k, v) => 
System.setProperty(s"spark.$amFilter.param.$k", v) }
+    } else {
+      actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase)
+    }
+  }
+
+  /**
+   * Start the user class, which contains the spark driver, in a separate 
Thread.
+   * If the main routine exits cleanly or exits with System.exit(N) for any N
+   * we assume it was successful, for all other cases we assume failure.
+   *
+   * Returns the user thread that was started.
+   */
+  private def startUserClass(): Thread = {
+    logInfo("Starting the user JAR in a separate Thread")
+    System.setProperty("spark.executor.instances", args.numExecutors.toString)
+    val mainMethod = Class.forName(args.userClass, false,
+      Thread.currentThread.getContextClassLoader).getMethod("main", 
classOf[Array[String]])
+
+    val userThread = new Thread {
+      override def run() {
+        try {
+          val mainArgs = new Array[String](args.userArgs.size)
+          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+          mainMethod.invoke(null, mainArgs)
+          finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
+          logDebug("Done running users class")
+        } catch {
+          case e: InvocationTargetException =>
+            e.getCause match {
+              case _: InterruptedException =>
+                // Reporter thread can interrupt to stop user class
+              case e: Exception =>
+                finish(FinalApplicationStatus.FAILED,
+                  ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
+                  "User class threw exception: " + e.getMessage)
+                // re-throw to get it logged
+                throw e
+            }
+        }
+      }
+    }
+    userThread.setName("Driver")
+    userThread.start()
+    userThread
+  }
+
+  /**
+   * Actor that communicates with the driver in client deploy mode.
+   */
+  private class AMActor(driverUrl: String) extends Actor {
+    var driver: ActorSelection = _
+
+    override def preStart() = {
+      logInfo("Listen to driver: " + driverUrl)
+      driver = context.actorSelection(driverUrl)
+      // Send a hello message to establish the connection, after which
+      // we can monitor Lifecycle Events.
+      driver ! "Hello"
+      driver ! RegisterClusterManager
+      context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+    }
+
+    override def receive = {
+      case x: DisassociatedEvent =>
+        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+        finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
+
+      case x: AddWebUIFilter =>
+        logInfo(s"Add WebUI Filter. $x")
+        driver ! x
+
+      case RequestExecutors(requestedTotal) =>
+        logInfo(s"Driver requested a total number of $requestedTotal 
executor(s).")
+        Option(allocator) match {
+          case Some(a) => a.requestTotalExecutors(requestedTotal)
+          case None => logWarning("Container allocator is not ready to request 
executors yet.")
+        }
+        sender ! true
+
+      case KillExecutors(executorIds) =>
+        logInfo(s"Driver requested to kill executor(s) 
${executorIds.mkString(", ")}.")
+        Option(allocator) match {
+          case Some(a) => executorIds.foreach(a.killExecutor)
+          case None => logWarning("Container allocator is not ready to kill 
executors yet.")
+        }
+        sender ! true
+    }
+  }
+
+}
+
+object ApplicationMaster extends Logging {
+
+  val SHUTDOWN_HOOK_PRIORITY: Int = 30
+
+  // exit codes for different causes, no reason behind the values
+  private val EXIT_SUCCESS = 0
+  private val EXIT_UNCAUGHT_EXCEPTION = 10
+  private val EXIT_MAX_EXECUTOR_FAILURES = 11
+  private val EXIT_REPORTER_FAILURE = 12
+  private val EXIT_SC_NOT_INITED = 13
+  private val EXIT_SECURITY = 14
+  private val EXIT_EXCEPTION_USER_CLASS = 15
+
+  private var master: ApplicationMaster = _
+
+  def main(args: Array[String]) = {
+    SignalLogger.register(log)
+    val amArgs = new ApplicationMasterArguments(args)
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
+      System.exit(master.run())
+    }
+  }
+
+  private[spark] def sparkContextInitialized(sc: SparkContext) = {
+    master.sparkContextInitialized(sc)
+  }
+
+  private[spark] def sparkContextStopped(sc: SparkContext) = {
+    master.sparkContextStopped(sc)
+  }
+
+}
+
+/**
+ * This object does not provide any special functionality. It exists so that 
it's easy to tell
+ * apart the client-mode AM from the cluster-mode AM when using tools such as 
ps or jps.
+ */
+object ExecutorLauncher {
+
+  def main(args: Array[String]) = {
+    ApplicationMaster.main(args)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
new file mode 100644
index 0000000..d76a632
--- /dev/null
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.spark.util.{MemoryParam, IntParam}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import collection.mutable.ArrayBuffer
+
+class ApplicationMasterArguments(val args: Array[String]) {
+  var userJar: String = null
+  var userClass: String = null
+  var userArgs: Seq[String] = Seq[String]()
+  var executorMemory = 1024
+  var executorCores = 1
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
+
+  parseArgs(args.toList)
+
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    val userArgsBuffer = new ArrayBuffer[String]()
+
+    var args = inputArgs
+
+    while (!args.isEmpty) {
+      // --num-workers, --worker-memory, and --worker-cores are deprecated 
since 1.0,
+      // the properties with executor in their names are preferred.
+      args match {
+        case ("--jar") :: value :: tail =>
+          userJar = value
+          args = tail
+
+        case ("--class") :: value :: tail =>
+          userClass = value
+          args = tail
+
+        case ("--args" | "--arg") :: value :: tail =>
+          userArgsBuffer += value
+          args = tail
+
+        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail 
=>
+          numExecutors = value
+          args = tail
+
+        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) 
:: tail =>
+          executorMemory = value
+          args = tail
+
+        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: 
tail =>
+          executorCores = value
+          args = tail
+
+        case _ =>
+          printUsageAndExit(1, args)
+      }
+    }
+
+    userArgs = userArgsBuffer.readOnly
+  }
+
+  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+    if (unknownParam != null) {
+      System.err.println("Unknown/unsupported param " + unknownParam)
+    }
+    System.err.println("""
+      |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
+      |Options:
+      |  --jar JAR_PATH       Path to your application's JAR file
+      |  --class CLASS_NAME   Name of your application's main class
+      |  --args ARGS          Arguments to be passed to your application's 
main class.
+      |                       Multiple invocations are possible, each will be 
passed in order.
+      |  --num-executors NUM    Number of executors to start (Default: 2)
+      |  --executor-cores NUM   Number of cores for the executors (Default: 1)
+      |  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 
1G)
+      """.stripMargin)
+    System.exit(exitCode)
+  }
+}
+
+object ApplicationMasterArguments {
+  val DEFAULT_NUMBER_EXECUTORS = 2
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
new file mode 100644
index 0000000..addaddb
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+
+/**
+ * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's 
stable API.
+ */
+private[spark] class Client(
+    val args: ClientArguments,
+    val hadoopConf: Configuration,
+    val sparkConf: SparkConf)
+  extends ClientBase with Logging {
+
+  def this(clientArgs: ClientArguments, spConf: SparkConf) =
+    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
+
+  def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
+
+  val yarnClient = YarnClient.createYarnClient
+  val yarnConf = new YarnConfiguration(hadoopConf)
+
+  def stop(): Unit = yarnClient.stop()
+
+  /* 
-------------------------------------------------------------------------------------
 *
+   | The following methods have much in common in the stable and alpha 
versions of Client, |
+   | but cannot be implemented in the parent trait due to subtle API 
differences across    |
+   | hadoop versions.                                                          
            |
+   * 
-------------------------------------------------------------------------------------
 */
+
+  /**
+   * Submit an application running our ApplicationMaster to the 
ResourceManager.
+   *
+   * The stable Yarn API provides a convenience method 
(YarnClient#createApplication) for
+   * creating applications and setting up the application submission context. 
This was not
+   * available in the alpha API.
+   */
+  override def submitApplication(): ApplicationId = {
+    yarnClient.init(yarnConf)
+    yarnClient.start()
+
+    logInfo("Requesting a new application from cluster with %d NodeManagers"
+      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
+
+    // Get a new application from our RM
+    val newApp = yarnClient.createApplication()
+    val newAppResponse = newApp.getNewApplicationResponse()
+    val appId = newAppResponse.getApplicationId()
+
+    // Verify whether the cluster has enough resources for our AM
+    verifyClusterResources(newAppResponse)
+
+    // Set up the appropriate contexts to launch our AM
+    val containerContext = createContainerLaunchContext(newAppResponse)
+    val appContext = createApplicationSubmissionContext(newApp, 
containerContext)
+
+    // Finally, submit and monitor the application
+    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+    yarnClient.submitApplication(appContext)
+    appId
+  }
+
+  /**
+   * Set up the context for submitting our ApplicationMaster.
+   * This uses the YarnClientApplication not available in the Yarn alpha API.
+   */
+  def createApplicationSubmissionContext(
+      newApp: YarnClientApplication,
+      containerContext: ContainerLaunchContext): ApplicationSubmissionContext 
= {
+    val appContext = newApp.getApplicationSubmissionContext
+    appContext.setApplicationName(args.appName)
+    appContext.setQueue(args.amQueue)
+    appContext.setAMContainerSpec(containerContext)
+    appContext.setApplicationType("SPARK")
+    val capability = Records.newRecord(classOf[Resource])
+    capability.setMemory(args.amMemory + amMemoryOverhead)
+    appContext.setResource(capability)
+    appContext
+  }
+
+  /** Set up security tokens for launching our ApplicationMaster container. */
+  override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = 
{
+    val dob = new DataOutputBuffer
+    credentials.writeTokenStorageToStream(dob)
+    amContainer.setTokens(ByteBuffer.wrap(dob.getData))
+  }
+
+  /** Get the application report from the ResourceManager for an application 
we have submitted. */
+  override def getApplicationReport(appId: ApplicationId): ApplicationReport =
+    yarnClient.getApplicationReport(appId)
+
+  /**
+   * Return the security token used by this client to communicate with the 
ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   */
+  override def getClientToken(report: ApplicationReport): String =
+    Option(report.getClientToAMToken).map(_.toString).getOrElse("")
+}
+
+object Client {
+  def main(argStrings: Array[String]) {
+    if (!sys.props.contains("SPARK_SUBMIT")) {
+      println("WARNING: This client is deprecated and will be removed in a " +
+        "future version of Spark. Use ./bin/spark-submit with \"--master 
yarn\"")
+    }
+
+    // Set an env variable indicating we are running in YARN mode.
+    // Note that any env variable with the SPARK_ prefix gets propagated to 
all (remote) processes
+    System.setProperty("SPARK_YARN_MODE", "true")
+    val sparkConf = new SparkConf
+
+    val args = new ClientArguments(argStrings, sparkConf)
+    new Client(args, sparkConf).run()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to