http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
new file mode 100644
index 0000000..c439969
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
+
+// TODO: Add code and support for ensuring that yarn resource 'tasks' are 
location aware !
+private[spark] class ClientArguments(args: Array[String], sparkConf: 
SparkConf) {
+  var addJars: String = null
+  var files: String = null
+  var archives: String = null
+  var userJar: String = null
+  var userClass: String = null
+  var userArgs: Seq[String] = Seq[String]()
+  var executorMemory = 1024 // MB
+  var executorCores = 1
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
+  var amQueue = sparkConf.get("spark.yarn.queue", "default")
+  var amMemory: Int = 512 // MB
+  var appName: String = "Spark"
+  var priority = 0
+
+  // Additional memory to allocate to containers
+  // For now, use driver's memory overhead as our AM container's memory 
overhead
+  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+    math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
+
+  val executorMemoryOverhead = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, 
MEMORY_OVERHEAD_MIN))
+
+  private val isDynamicAllocationEnabled =
+    sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+
+  parseArgs(args.toList)
+  loadEnvironmentArgs()
+  validateArgs()
+
+  /** Load any default arguments provided through environment variables and 
Spark properties. */
+  private def loadEnvironmentArgs(): Unit = {
+    // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be 
resolved to hdfs://,
+    // while spark.yarn.dist.{archives/files} should be resolved to file:// 
(SPARK-2051).
+    files = Option(files)
+      .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
+      .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => 
Utils.resolveURIs(p)))
+      .orNull
+    archives = Option(archives)
+      .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
+      .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => 
Utils.resolveURIs(p)))
+      .orNull
+    // If dynamic allocation is enabled, start at the max number of executors
+    if (isDynamicAllocationEnabled) {
+      val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
+      if (!sparkConf.contains(maxExecutorsConf)) {
+        throw new IllegalArgumentException(
+          s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+      }
+      numExecutors = sparkConf.get(maxExecutorsConf).toInt
+    }
+  }
+
+  /**
+   * Fail fast if any arguments provided are invalid.
+   * This is intended to be called only after the provided arguments have been 
parsed.
+   */
+  private def validateArgs(): Unit = {
+    if (numExecutors <= 0) {
+      throw new IllegalArgumentException(
+        "You must specify at least 1 executor!\n" + getUsageMessage())
+    }
+  }
+
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    val userArgsBuffer = new ArrayBuffer[String]()
+    var args = inputArgs
+
+    while (!args.isEmpty) {
+      args match {
+        case ("--jar") :: value :: tail =>
+          userJar = value
+          args = tail
+
+        case ("--class") :: value :: tail =>
+          userClass = value
+          args = tail
+
+        case ("--args" | "--arg") :: value :: tail =>
+          if (args(0) == "--args") {
+            println("--args is deprecated. Use --arg instead.")
+          }
+          userArgsBuffer += value
+          args = tail
+
+        case ("--master-class" | "--am-class") :: value :: tail =>
+          println(s"${args(0)} is deprecated and is not used anymore.")
+          args = tail
+
+        case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: 
tail =>
+          if (args(0) == "--master-memory") {
+            println("--master-memory is deprecated. Use --driver-memory 
instead.")
+          }
+          amMemory = value
+          args = tail
+
+        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail 
=>
+          if (args(0) == "--num-workers") {
+            println("--num-workers is deprecated. Use --num-executors 
instead.")
+          }
+          // Dynamic allocation is not compatible with this option
+          if (isDynamicAllocationEnabled) {
+            throw new IllegalArgumentException("Explicitly setting the number 
" +
+              "of executors is not compatible with 
spark.dynamicAllocation.enabled!")
+          }
+          numExecutors = value
+          args = tail
+
+        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) 
:: tail =>
+          if (args(0) == "--worker-memory") {
+            println("--worker-memory is deprecated. Use --executor-memory 
instead.")
+          }
+          executorMemory = value
+          args = tail
+
+        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: 
tail =>
+          if (args(0) == "--worker-cores") {
+            println("--worker-cores is deprecated. Use --executor-cores 
instead.")
+          }
+          executorCores = value
+          args = tail
+
+        case ("--queue") :: value :: tail =>
+          amQueue = value
+          args = tail
+
+        case ("--name") :: value :: tail =>
+          appName = value
+          args = tail
+
+        case ("--addJars") :: value :: tail =>
+          addJars = value
+          args = tail
+
+        case ("--files") :: value :: tail =>
+          files = value
+          args = tail
+
+        case ("--archives") :: value :: tail =>
+          archives = value
+          args = tail
+
+        case Nil =>
+
+        case _ =>
+          throw new IllegalArgumentException(getUsageMessage(args))
+      }
+    }
+
+    userArgs = userArgsBuffer.readOnly
+  }
+
+  private def getUsageMessage(unknownParam: List[String] = null): String = {
+    val message = if (unknownParam != null) s"Unknown/unsupported param 
$unknownParam\n" else ""
+    message + """
+      |Usage: org.apache.spark.deploy.yarn.Client [options]
+      |Options:
+      |  --jar JAR_PATH           Path to your application's JAR file 
(required in yarn-cluster
+      |                           mode)
+      |  --class CLASS_NAME       Name of your application's main class 
(required)
+      |  --arg ARG                Argument to be passed to your application's 
main class.
+      |                           Multiple invocations are possible, each will 
be passed in order.
+      |  --num-executors NUM      Number of executors to start (Default: 2)
+      |  --executor-cores NUM     Number of cores for the executors (Default: 
1).
+      |  --driver-memory MEM      Memory for driver (e.g. 1000M, 2G) (Default: 
512 Mb)
+      |  --executor-memory MEM    Memory per executor (e.g. 1000M, 2G) 
(Default: 1G)
+      |  --name NAME              The name of your application (Default: Spark)
+      |  --queue QUEUE            The hadoop queue to use for allocation 
requests (Default:
+      |                           'default')
+      |  --addJars jars           Comma separated list of local jars that want 
SparkContext.addJar
+      |                           to work with.
+      |  --files files            Comma separated list of files to be 
distributed with the job.
+      |  --archives archives      Comma separated list of archives to be 
distributed with the job.
+      """
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
new file mode 100644
index 0000000..f95d723
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -0,0 +1,823 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
+import scala.util.{Try, Success, Failure}
+
+import com.google.common.base.Objects
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.mapreduce.MRJobConfig
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.util.StringUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, 
SparkException}
+import org.apache.spark.util.Utils
+
+/**
+ * The entry point (starting in Client#main() and Client#run()) for launching 
Spark on YARN.
+ * The Client submits an application to the YARN ResourceManager.
+ */
+private[spark] trait ClientBase extends Logging {
+  import ClientBase._
+
+  protected val args: ClientArguments
+  protected val hadoopConf: Configuration
+  protected val sparkConf: SparkConf
+  protected val yarnConf: YarnConfiguration
+  protected val credentials = 
UserGroupInformation.getCurrentUser.getCredentials
+  protected val amMemoryOverhead = args.amMemoryOverhead // MB
+  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
+  private val distCacheMgr = new ClientDistributedCacheManager()
+  private val isLaunchingDriver = args.userClass != null
+
+  /**
+   * Fail fast if we have requested more resources per container than is 
available in the cluster.
+   */
+  protected def verifyClusterResources(newAppResponse: 
GetNewApplicationResponse): Unit = {
+    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
+    logInfo("Verifying our application has not requested more than the maximum 
" +
+      s"memory capability of the cluster ($maxMem MB per container)")
+    val executorMem = args.executorMemory + executorMemoryOverhead
+    if (executorMem > maxMem) {
+      throw new IllegalArgumentException(s"Required executor memory 
(${args.executorMemory}" +
+        s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) 
of this cluster!")
+    }
+    val amMem = args.amMemory + amMemoryOverhead
+    if (amMem > maxMem) {
+      throw new IllegalArgumentException(s"Required AM memory 
(${args.amMemory}" +
+        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of 
this cluster!")
+    }
+    logInfo("Will allocate AM container, with %d MB memory including %d MB 
overhead".format(
+      amMem,
+      amMemoryOverhead))
+
+    // We could add checks to make sure the entire cluster has enough 
resources but that involves
+    // getting all the node reports and computing ourselves.
+  }
+
+  /**
+   * Copy the given file to a remote file system (e.g. HDFS) if needed.
+   * The file is only copied if the source and destination file systems are 
different. This is used
+   * for preparing resources for launching the ApplicationMaster container. 
Exposed for testing.
+   */
+  def copyFileToRemote(
+      destDir: Path,
+      srcPath: Path,
+      replication: Short,
+      setPerms: Boolean = false): Path = {
+    val destFs = destDir.getFileSystem(hadoopConf)
+    val srcFs = srcPath.getFileSystem(hadoopConf)
+    var destPath = srcPath
+    if (!compareFs(srcFs, destFs)) {
+      destPath = new Path(destDir, srcPath.getName())
+      logInfo(s"Uploading resource $srcPath -> $destPath")
+      FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
+      destFs.setReplication(destPath, replication)
+      if (setPerms) {
+        destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
+      }
+    } else {
+      logInfo(s"Source and destination file systems are the same. Not copying 
$srcPath")
+    }
+    // Resolve any symlinks in the URI path so using a "current" symlink to 
point to a specific
+    // version shows the specific version in the distributed cache 
configuration
+    val qualifiedDestPath = destFs.makeQualified(destPath)
+    val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
+    fc.resolvePath(qualifiedDestPath)
+  }
+
+  /**
+   * Given a local URI, resolve it and return a qualified local path that 
corresponds to the URI.
+   * This is used for preparing local resources to be included in the 
container launch context.
+   */
+  private def getQualifiedLocalPath(localURI: URI): Path = {
+    val qualifiedURI =
+      if (localURI.getScheme == null) {
+        // If not specified, assume this is in the local filesystem to keep 
the behavior
+        // consistent with that of Hadoop
+        new URI(FileSystem.getLocal(hadoopConf).makeQualified(new 
Path(localURI)).toString)
+      } else {
+        localURI
+      }
+    new Path(qualifiedURI)
+  }
+
+  /**
+   * Upload any resources to the distributed cache if needed. If a resource is 
intended to be
+   * consumed locally, set up the appropriate config for downstream code to 
handle it properly.
+   * This is used for setting up a container launch context for our 
ApplicationMaster.
+   * Exposed for testing.
+   */
+  def prepareLocalResources(appStagingDir: String): HashMap[String, 
LocalResource] = {
+    logInfo("Preparing resources for our AM container")
+    // Upload Spark and the application JAR to the remote file system if 
necessary,
+    // and add them as local resources to the application master.
+    val fs = FileSystem.get(hadoopConf)
+    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+    val nns = getNameNodesToAccess(sparkConf) + dst
+    obtainTokensForNamenodes(nns, hadoopConf, credentials)
+
+    val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
+      fs.getDefaultReplication(dst)).toShort
+    val localResources = HashMap[String, LocalResource]()
+    FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+
+    val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
+    if (oldLog4jConf.isDefined) {
+      logWarning(
+        "SPARK_LOG4J_CONF detected in the system environment. This variable 
has been " +
+        "deprecated. Please refer to the \"Launching Spark on YARN\" 
documentation " +
+        "for alternatives.")
+    }
+
+    /**
+     * Copy the given main resource to the distributed cache if the scheme is 
not "local".
+     * Otherwise, set the corresponding key in our SparkConf to handle it 
downstream.
+     * Each resource is represented by a 4-tuple of:
+     *   (1) destination resource name,
+     *   (2) local path to the resource,
+     *   (3) Spark property key to set if the scheme is not local, and
+     *   (4) whether to set permissions for this resource
+     */
+    List(
+      (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
+      (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
+      ("log4j.properties", oldLog4jConf.orNull, null, false)
+    ).foreach { case (destName, _localPath, confKey, setPermissions) =>
+      val localPath: String = if (_localPath != null) _localPath.trim() else ""
+      if (!localPath.isEmpty()) {
+        val localURI = new URI(localPath)
+        if (localURI.getScheme != LOCAL_SCHEME) {
+          val src = getQualifiedLocalPath(localURI)
+          val destPath = copyFileToRemote(dst, src, replication, 
setPermissions)
+          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+          distCacheMgr.addResource(destFs, hadoopConf, destPath,
+            localResources, LocalResourceType.FILE, destName, statCache)
+        } else if (confKey != null) {
+          // If the resource is intended for local use only, handle this 
downstream
+          // by setting the appropriate property
+          sparkConf.set(confKey, localPath)
+        }
+      }
+    }
+
+    /**
+     * Do the same for any additional resources passed in through 
ClientArguments.
+     * Each resource category is represented by a 3-tuple of:
+     *   (1) comma separated list of resources in this category,
+     *   (2) resource type, and
+     *   (3) whether to add these resources to the classpath
+     */
+    val cachedSecondaryJarLinks = ListBuffer.empty[String]
+    List(
+      (args.addJars, LocalResourceType.FILE, true),
+      (args.files, LocalResourceType.FILE, false),
+      (args.archives, LocalResourceType.ARCHIVE, false)
+    ).foreach { case (flist, resType, addToClasspath) =>
+      if (flist != null && !flist.isEmpty()) {
+        flist.split(',').foreach { file =>
+          val localURI = new URI(file.trim())
+          if (localURI.getScheme != LOCAL_SCHEME) {
+            val localPath = new Path(localURI)
+            val linkname = 
Option(localURI.getFragment()).getOrElse(localPath.getName())
+            val destPath = copyFileToRemote(dst, localPath, replication)
+            distCacheMgr.addResource(
+              fs, hadoopConf, destPath, localResources, resType, linkname, 
statCache)
+            if (addToClasspath) {
+              cachedSecondaryJarLinks += linkname
+            }
+          } else if (addToClasspath) {
+            // Resource is intended for local use only and should be added to 
the class path
+            cachedSecondaryJarLinks += file.trim()
+          }
+        }
+      }
+    }
+    if (cachedSecondaryJarLinks.nonEmpty) {
+      sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, 
cachedSecondaryJarLinks.mkString(","))
+    }
+
+    localResources
+  }
+
+  /**
+   * Set up the environment for launching our ApplicationMaster container.
+   */
+  private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+    logInfo("Setting up the launch environment for our AM container")
+    val env = new HashMap[String, String]()
+    val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
+    populateClasspath(args, yarnConf, sparkConf, env, extraCp)
+    env("SPARK_YARN_MODE") = "true"
+    env("SPARK_YARN_STAGING_DIR") = stagingDir
+    env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
+
+    // Set the environment variables to be passed on to the executors.
+    distCacheMgr.setDistFilesEnv(env)
+    distCacheMgr.setDistArchivesEnv(env)
+
+    // Pick up any environment variables for the AM provided through 
spark.yarn.appMasterEnv.*
+    val amEnvPrefix = "spark.yarn.appMasterEnv."
+    sparkConf.getAll
+      .filter { case (k, v) => k.startsWith(amEnvPrefix) }
+      .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
+      .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, 
k, v) }
+
+    // Keep this for backwards compatibility but users should move to the 
config
+    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+      // Allow users to specify some environment variables.
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+      // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up 
executor environments.
+      env("SPARK_YARN_USER_ENV") = userEnvs
+    }
+
+    // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to 
propagate it to
+    // executors. But we can't just set spark.executor.extraJavaOptions, 
because the driver's
+    // SparkContext will not let that set spark* system properties, which is 
expected behavior for
+    // Yarn clients. So propagate it through the environment.
+    //
+    // Note that to warn the user about the deprecation in cluster mode, some 
code from
+    // SparkConf#validateSettings() is duplicated here (to avoid triggering 
the condition
+    // described above).
+    if (isLaunchingDriver) {
+      sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
+        val warning =
+          s"""
+            |SPARK_JAVA_OPTS was detected (set to '$value').
+            |This is deprecated in Spark 1.0+.
+            |
+            |Please instead use:
+            | - ./spark-submit with conf/spark-defaults.conf to set defaults 
for an application
+            | - ./spark-submit with --driver-java-options to set -X options 
for a driver
+            | - spark.executor.extraJavaOptions to set -X options for executors
+          """.stripMargin
+        logWarning(warning)
+        for (proc <- Seq("driver", "executor")) {
+          val key = s"spark.$proc.extraJavaOptions"
+          if (sparkConf.contains(key)) {
+            throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. 
Use only the former.")
+          }
+        }
+        env("SPARK_JAVA_OPTS") = value
+      }
+    }
+
+    env
+  }
+
+  /**
+   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
+   * This sets up the launch environment, java options, and the command for 
launching the AM.
+   */
+  protected def createContainerLaunchContext(newAppResponse: 
GetNewApplicationResponse)
+      : ContainerLaunchContext = {
+    logInfo("Setting up container launch context for our AM")
+
+    val appId = newAppResponse.getApplicationId
+    val appStagingDir = getAppStagingDir(appId)
+    val localResources = prepareLocalResources(appStagingDir)
+    val launchEnv = setupLaunchEnv(appStagingDir)
+    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+    amContainer.setLocalResources(localResources)
+    amContainer.setEnvironment(launchEnv)
+
+    val javaOpts = ListBuffer[String]()
+
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    var prefixEnv: Option[String] = None
+
+    // Add Xmx for AM memory
+    javaOpts += "-Xmx" + args.amMemory + "m"
+
+    val tmpDir = new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+    javaOpts += "-Djava.io.tmpdir=" + tmpDir
+
+    // TODO: Remove once cpuset version is pushed out.
+    // The context is, default gc for server class machines ends up using all 
cores to do gc -
+    // hence if there are multiple containers in same node, Spark GC affects 
all other containers'
+    // performance (which can be that of other Spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce "proper" 
Spark behavior in
+    // multi-tenant environments. Not sure how default Java GC behaves if it 
is limited to subset
+    // of cores on a node.
+    val useConcurrentAndIncrementalGC = 
launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
+    if (useConcurrentAndIncrementalGC) {
+      // In our expts, using (default) throughput collector has severe perf 
ramifications in
+      // multi-tenant machines
+      javaOpts += "-XX:+UseConcMarkSweepGC"
+      javaOpts += "-XX:+CMSIncrementalMode"
+      javaOpts += "-XX:+CMSIncrementalPacing"
+      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
+      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
+    }
+
+    // Forward the Spark configuration to the application master / executors.
+    // TODO: it might be nicer to pass these as an internal environment 
variable rather than
+    // as Java options, due to complications with string parsing of nested 
quotes.
+    for ((k, v) <- sparkConf.getAll) {
+      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
+    }
+
+    // Include driver-specific java options if we are launching a driver
+    if (isLaunchingDriver) {
+      sparkConf.getOption("spark.driver.extraJavaOptions")
+        .orElse(sys.env.get("SPARK_JAVA_OPTS"))
+        .foreach(opts => javaOpts += opts)
+      val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
+        sys.props.get("spark.driver.libraryPath")).flatten
+      if (libraryPaths.nonEmpty) {
+        prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
+      }
+    }
+
+    // For log4j configuration to reference
+    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+
+    val userClass =
+      if (isLaunchingDriver) {
+        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
+      } else {
+        Nil
+      }
+    val userJar =
+      if (args.userJar != null) {
+        Seq("--jar", args.userJar)
+      } else {
+        Nil
+      }
+    val amClass =
+      if (isLaunchingDriver) {
+        Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
+      } else {
+        Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
+      }
+    val userArgs = args.userArgs.flatMap { arg =>
+      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
+    }
+    val amArgs =
+      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
+      Seq(
+        "--executor-memory", args.executorMemory.toString + "m",
+        "--executor-cores", args.executorCores.toString,
+        "--num-executors ", args.numExecutors.toString)
+
+    // Command for the ApplicationMaster
+    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", 
"-server") ++
+      javaOpts ++ amArgs ++
+      Seq(
+        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    // TODO: it would be nicer to just make sure there are no null commands 
here
+    val printableCommands = commands.map(s => if (s == null) "null" else 
s).toList
+    amContainer.setCommands(printableCommands)
+
+    
logDebug("===============================================================================")
+    logDebug("Yarn AM launch context:")
+    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
+    logDebug("    env:")
+    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
+    logDebug("    resources:")
+    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
+    logDebug("    command:")
+    logDebug(s"        ${printableCommands.mkString(" ")}")
+    
logDebug("===============================================================================")
+
+    // send the acl settings into YARN to control who has access via YARN 
interfaces
+    val securityManager = new SecurityManager(sparkConf)
+    
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
+    setupSecurityToken(amContainer)
+    UserGroupInformation.getCurrentUser().addCredentials(credentials)
+
+    amContainer
+  }
+
+  /**
+   * Report the state of an application until it has exited, either 
successfully or
+   * due to some failure, then return a pair of the yarn application state 
(FINISHED, FAILED,
+   * KILLED, or RUNNING) and the final application state (UNDEFINED, 
SUCCEEDED, FAILED,
+   * or KILLED).
+   *
+   * @param appId ID of the application to monitor.
+   * @param returnOnRunning Whether to also return the application state when 
it is RUNNING.
+   * @param logApplicationReport Whether to log details of the application 
report every iteration.
+   * @return A pair of the yarn application state and the final application 
state.
+   */
+  def monitorApplication(
+      appId: ApplicationId,
+      returnOnRunning: Boolean = false,
+      logApplicationReport: Boolean = true): (YarnApplicationState, 
FinalApplicationStatus) = {
+    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+    var lastState: YarnApplicationState = null
+    while (true) {
+      Thread.sleep(interval)
+      val report = getApplicationReport(appId)
+      val state = report.getYarnApplicationState
+
+      if (logApplicationReport) {
+        logInfo(s"Application report for $appId (state: $state)")
+        val details = Seq[(String, String)](
+          ("client token", getClientToken(report)),
+          ("diagnostics", report.getDiagnostics),
+          ("ApplicationMaster host", report.getHost),
+          ("ApplicationMaster RPC port", report.getRpcPort.toString),
+          ("queue", report.getQueue),
+          ("start time", report.getStartTime.toString),
+          ("final status", report.getFinalApplicationStatus.toString),
+          ("tracking URL", report.getTrackingUrl),
+          ("user", report.getUser)
+        )
+
+        // Use more loggable format if value is null or empty
+        val formattedDetails = details
+          .map { case (k, v) =>
+            val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+            s"\n\t $k: $newValue" }
+          .mkString("")
+
+        // If DEBUG is enabled, log report details every iteration
+        // Otherwise, log them every time the application changes state
+        if (log.isDebugEnabled) {
+          logDebug(formattedDetails)
+        } else if (lastState != state) {
+          logInfo(formattedDetails)
+        }
+      }
+
+      if (state == YarnApplicationState.FINISHED ||
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+        return (state, report.getFinalApplicationStatus)
+      }
+
+      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
+        return (state, report.getFinalApplicationStatus)
+      }
+
+      lastState = state
+    }
+
+    // Never reached, but keeps compiler happy
+    throw new SparkException("While loop is depleted! This should never 
happen...")
+  }
+
+  /**
+   * Submit an application to the ResourceManager and monitor its state.
+   * This continues until the application has exited for any reason.
+   * If the application finishes with a failed, killed, or undefined status,
+   * throw an appropriate SparkException.
+   */
+  def run(): Unit = {
+    val (yarnApplicationState, finalApplicationStatus) = 
monitorApplication(submitApplication())
+    if (yarnApplicationState == YarnApplicationState.FAILED ||
+      finalApplicationStatus == FinalApplicationStatus.FAILED) {
+      throw new SparkException("Application finished with failed status")
+    }
+    if (yarnApplicationState == YarnApplicationState.KILLED ||
+      finalApplicationStatus == FinalApplicationStatus.KILLED) {
+      throw new SparkException("Application is killed")
+    }
+    if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+      throw new SparkException("The final status of application is undefined")
+    }
+  }
+
+  /* 
---------------------------------------------------------------------------------------
 *
+   |  Methods that cannot be implemented here due to API differences across 
hadoop versions  |
+   * 
---------------------------------------------------------------------------------------
 */
+
+  /** Submit an application running our ApplicationMaster to the 
ResourceManager. */
+  def submitApplication(): ApplicationId
+
+  /** Set up security tokens for launching our ApplicationMaster container. */
+  protected def setupSecurityToken(containerContext: ContainerLaunchContext): 
Unit
+
+  /** Get the application report from the ResourceManager for an application 
we have submitted. */
+  protected def getApplicationReport(appId: ApplicationId): ApplicationReport
+
+  /**
+   * Return the security token used by this client to communicate with the 
ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   */
+  protected def getClientToken(report: ApplicationReport): String
+}
+
+private[spark] object ClientBase extends Logging {
+
+  // Alias for the Spark assembly jar and the user jar
+  val SPARK_JAR: String = "__spark__.jar"
+  val APP_JAR: String = "__app__.jar"
+
+  // URI scheme that identifies local resources
+  val LOCAL_SCHEME = "local"
+
+  // Staging directory for any temporary jars or files
+  val SPARK_STAGING: String = ".sparkStaging"
+
+  // Location of any user-defined Spark jars
+  val CONF_SPARK_JAR = "spark.yarn.jar"
+  val ENV_SPARK_JAR = "SPARK_JAR"
+
+  // Internal config to propagate the location of the user's jar to the 
driver/executors
+  val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
+
+  // Internal config to propagate the locations of any extra jars to add to 
the classpath
+  // of the executors
+  val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
+
+  // Staging directory is private! -> rwx--------
+  val STAGING_DIR_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+  // App files are world-wide readable and owner writable -> rw-r--r--
+  val APP_FILE_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
+
+  /**
+   * Find the user-defined Spark jar if configured, or return the jar 
containing this
+   * class if not.
+   *
+   * This method first looks in the SparkConf object for the CONF_SPARK_JAR 
key, and in the
+   * user environment if that is not found (for backwards compatibility).
+   */
+  private def sparkJar(conf: SparkConf): String = {
+    if (conf.contains(CONF_SPARK_JAR)) {
+      conf.get(CONF_SPARK_JAR)
+    } else if (System.getenv(ENV_SPARK_JAR) != null) {
+      logWarning(
+        s"$ENV_SPARK_JAR detected in the system environment. This variable has 
been deprecated " +
+        s"in favor of the $CONF_SPARK_JAR configuration variable.")
+      System.getenv(ENV_SPARK_JAR)
+    } else {
+      SparkContext.jarOfClass(this.getClass).head
+    }
+  }
+
+  /**
+   * Return the path to the given application's staging directory.
+   */
+  private def getAppStagingDir(appId: ApplicationId): String = {
+    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+  }
+
+  /**
+   * Populate the classpath entry in the given environment map with any 
application
+   * classpath specified through the Hadoop and Yarn configurations.
+   */
+  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, 
String]): Unit = {
+    val classPathElementsToAdd = getYarnAppClasspath(conf) ++ 
getMRAppClasspath(conf)
+    for (c <- classPathElementsToAdd.flatten) {
+      YarnSparkHadoopUtil.addPathToEnvironment(env, 
Environment.CLASSPATH.name, c.trim)
+    }
+  }
+
+  private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
+    Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) 
match {
+      case Some(s) => Some(s.toSeq)
+      case None => getDefaultYarnApplicationClasspath
+  }
+
+  private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
+    Option(conf.getStrings("mapreduce.application.classpath")) match {
+      case Some(s) => Some(s.toSeq)
+      case None => getDefaultMRApplicationClasspath
+    }
+
+  def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
+    val triedDefault = Try[Seq[String]] {
+      val field = 
classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
+      val value = field.get(null).asInstanceOf[Array[String]]
+      value.toSeq
+    } recoverWith {
+      case e: NoSuchFieldException => Success(Seq.empty[String])
+    }
+
+    triedDefault match {
+      case f: Failure[_] =>
+        logError("Unable to obtain the default YARN Application classpath.", 
f.exception)
+      case s: Success[_] =>
+        logDebug(s"Using the default YARN application classpath: 
${s.get.mkString(",")}")
+    }
+
+    triedDefault.toOption
+  }
+
+  /**
+   * In Hadoop 0.23, the MR application classpath comes with the YARN 
application
+   * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a 
String.
+   * So we need to use reflection to retrieve it.
+   */
+  def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
+    val triedDefault = Try[Seq[String]] {
+      val field = 
classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
+      val value = if (field.getType == classOf[String]) {
+        StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray
+      } else {
+        field.get(null).asInstanceOf[Array[String]]
+      }
+      value.toSeq
+    } recoverWith {
+      case e: NoSuchFieldException => Success(Seq.empty[String])
+    }
+
+    triedDefault match {
+      case f: Failure[_] =>
+        logError("Unable to obtain the default MR Application classpath.", 
f.exception)
+      case s: Success[_] =>
+        logDebug(s"Using the default MR application classpath: 
${s.get.mkString(",")}")
+    }
+
+    triedDefault.toOption
+  }
+
+  /**
+   * Populate the classpath entry in the given environment map.
+   * This includes the user jar, Spark jar, and any extra application jars.
+   */
+  def populateClasspath(
+      args: ClientArguments,
+      conf: Configuration,
+      sparkConf: SparkConf,
+      env: HashMap[String, String],
+      extraClassPath: Option[String] = None): Unit = {
+    extraClassPath.foreach(addClasspathEntry(_, env))
+    addClasspathEntry(Environment.PWD.$(), env)
+
+    // Normally the users app.jar is last in case conflicts with spark jars
+    if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
+      addUserClasspath(args, sparkConf, env)
+      addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
+      populateHadoopClasspath(conf, env)
+    } else {
+      addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
+      populateHadoopClasspath(conf, env)
+      addUserClasspath(args, sparkConf, env)
+    }
+
+    // Append all jar files under the working directory to the classpath.
+    addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
+  }
+
+  /**
+   * Adds the user jars which have local: URIs (or alternate names, such as 
APP_JAR) explicitly
+   * to the classpath.
+   */
+  private def addUserClasspath(
+      args: ClientArguments,
+      conf: SparkConf,
+      env: HashMap[String, String]): Unit = {
+
+    // If `args` is not null, we are launching an AM container.
+    // Otherwise, we are launching executor containers.
+    val (mainJar, secondaryJars) =
+      if (args != null) {
+        (args.userJar, args.addJars)
+      } else {
+        (conf.get(CONF_SPARK_USER_JAR, null), 
conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
+      }
+
+    addFileToClasspath(mainJar, APP_JAR, env)
+    if (secondaryJars != null) {
+      secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
+        addFileToClasspath(jar, null, env)
+      }
+    }
+  }
+
+  /**
+   * Adds the given path to the classpath, handling "local:" URIs correctly.
+   *
+   * If an alternate name for the file is given, and it's not a "local:" file, 
the alternate
+   * name will be added to the classpath (relative to the job's work 
directory).
+   *
+   * If not a "local:" file and no alternate name, the environment is not 
modified.
+   *
+   * @param path      Path to add to classpath (optional).
+   * @param fileName  Alternate name for the file (optional).
+   * @param env       Map holding the environment variables.
+   */
+  private def addFileToClasspath(
+      path: String,
+      fileName: String,
+      env: HashMap[String, String]): Unit = {
+    if (path != null) {
+      scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
+        val uri = new URI(path)
+        if (uri.getScheme == LOCAL_SCHEME) {
+          addClasspathEntry(uri.getPath, env)
+          return
+        }
+      }
+    }
+    if (fileName != null) {
+      addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env)
+    }
+  }
+
+  /**
+   * Add the given path to the classpath entry of the given environment map.
+   * If the classpath is already set, this appends the new path to the 
existing classpath.
+   */
+  private def addClasspathEntry(path: String, env: HashMap[String, String]): 
Unit =
+    YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, 
path)
+
+  /**
+   * Get the list of namenodes the user may access.
+   */
+  def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+    sparkConf.get("spark.yarn.access.namenodes", "")
+      .split(",")
+      .map(_.trim())
+      .filter(!_.isEmpty)
+      .map(new Path(_))
+      .toSet
+  }
+
+  def getTokenRenewer(conf: Configuration): String = {
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    logDebug("delegation token renewer is: " + delegTokenRenewer)
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+    delegTokenRenewer
+  }
+
+  /**
+   * Obtains tokens for the namenodes passed in and adds them to the 
credentials.
+   */
+  def obtainTokensForNamenodes(
+      paths: Set[Path],
+      conf: Configuration,
+      creds: Credentials): Unit = {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      val delegTokenRenewer = getTokenRenewer(conf)
+      paths.foreach { dst =>
+        val dstFs = dst.getFileSystem(conf)
+        logDebug("getting token for namenode: " + dst)
+        dstFs.addDelegationTokens(delegTokenRenewer, creds)
+      }
+    }
+  }
+
+  /**
+   * Return whether the two file systems are the same.
+   */
+  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+    if (srcUri.getScheme() == null || srcUri.getScheme() != 
dstUri.getScheme()) {
+      return false
+    }
+
+    var srcHost = srcUri.getHost()
+    var dstHost = dstUri.getHost()
+
+    // In HA or when using viewfs, the host part of the URI may not actually 
be a host, but the
+    // name of the HDFS namespace. Those names won't resolve, so avoid even 
trying if they
+    // match.
+    if (srcHost != null && dstHost != null && srcHost != dstHost) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+      } catch {
+        case e: UnknownHostException =>
+          return false
+      }
+    }
+
+    Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
new file mode 100644
index 0000000..c592ecf
--- /dev/null
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+
+import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import org.apache.spark.Logging
+
+/** Client side methods to setup the Hadoop distributed cache */
+private[spark] class ClientDistributedCacheManager() extends Logging {
+
+  // Mappings from remote URI to (file status, modification time, visibility)
+  private val distCacheFiles: Map[String, (String, String, String)] =
+    LinkedHashMap[String, (String, String, String)]()
+  private val distCacheArchives: Map[String, (String, String, String)] =
+    LinkedHashMap[String, (String, String, String)]()
+
+
+  /**
+   * Add a resource to the list of distributed cache resources. This list can
+   * be sent to the ApplicationMaster and possibly the executors so that it can
+   * be downloaded into the Hadoop distributed cache for use by this 
application.
+   * Adds the LocalResource to the localResources HashMap passed in and saves 
+   * the stats of the resources to they can be sent to the executors and 
verified.
+   *
+   * @param fs FileSystem
+   * @param conf Configuration
+   * @param destPath path to the resource
+   * @param localResources localResource hashMap to insert the resource into
+   * @param resourceType LocalResourceType 
+   * @param link link presented in the distributed cache to the destination
+   * @param statCache cache to store the file/directory stats 
+   * @param appMasterOnly Whether to only add the resource to the app master
+   */
+  def addResource(
+      fs: FileSystem,
+      conf: Configuration,
+      destPath: Path, 
+      localResources: HashMap[String, LocalResource],
+      resourceType: LocalResourceType,
+      link: String,
+      statCache: Map[URI, FileStatus],
+      appMasterOnly: Boolean = false): Unit = {
+    val destStatus = fs.getFileStatus(destPath)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource])
+    amJarRsrc.setType(resourceType)
+    val visibility = getVisibility(conf, destPath.toUri(), statCache)
+    amJarRsrc.setVisibility(visibility)
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
+    amJarRsrc.setTimestamp(destStatus.getModificationTime())
+    amJarRsrc.setSize(destStatus.getLen())
+    if (link == null || link.isEmpty()) throw new Exception("You must specify 
a valid link name")
+    localResources(link) = amJarRsrc
+    
+    if (!appMasterOnly) {
+      val uri = destPath.toUri()
+      val pathURI = new URI(uri.getScheme(), uri.getAuthority(), 
uri.getPath(), null, link)
+      if (resourceType == LocalResourceType.FILE) {
+        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      } else {
+        distCacheArchives(pathURI.toString()) = 
(destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      }
+    }
+  }
+
+  /**
+   * Adds the necessary cache file env variables to the env passed in
+   */
+  def setDistFilesEnv(env: Map[String, String]): Unit = {
+    val (keys, tupleValues) = distCacheFiles.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc 
+ "," + n }
+      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = 
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Adds the necessary cache archive env variables to the env passed in
+   */
+  def setDistArchivesEnv(env: Map[String, String]): Unit = {
+    val (keys, tupleValues) = distCacheArchives.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => 
acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Returns the local resource visibility depending on the cache file 
permissions
+   * @return LocalResourceVisibility
+   */
+  def getVisibility(
+      conf: Configuration,
+      uri: URI,
+      statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
+    if (isPublic(conf, uri, statCache)) {
+      LocalResourceVisibility.PUBLIC
+    } else {
+      LocalResourceVisibility.PRIVATE
+    }
+  }
+
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all 
(public)
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, 
FileStatus]): Boolean = {
+    val fs = FileSystem.get(uri, conf)
+    val current = new Path(uri.getPath())
+    // the leaf level file should be readable by others
+    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+      return false
+    }
+    ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+  }
+
+  /**
+   * Returns true if all ancestors of the specified path have the 'execute'
+   * permission set for all users (i.e. that other users can traverse
+   * the directory hierarchy to the given path)
+   * @return true if all ancestors have the 'execute' permission set for all 
users
+   */
+  def ancestorsHaveExecutePermissions(
+      fs: FileSystem,
+      path: Path,
+      statCache: Map[URI, FileStatus]): Boolean =  {
+    var current = path
+    while (current != null) {
+      // the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
+        return false
+      }
+      current = current.getParent()
+    }
+    true
+  }
+
+  /**
+   * Checks for a given path whether the Other permissions on it
+   * imply the permission in the passed FsAction
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def checkPermissionOfOther(
+      fs: FileSystem,
+      path: Path,
+      action: FsAction,
+      statCache: Map[URI, FileStatus]): Boolean = {
+    val status = getFileStatus(fs, path.toUri(), statCache)
+    val perms = status.getPermission()
+    val otherAction = perms.getOtherAction()
+    otherAction.implies(action)
+  }
+
+  /**
+   * Checks to see if the given uri exists in the cache, if it does it
+   * returns the existing FileStatus, otherwise it stats the uri, stores
+   * it in the cache, and returns the FileStatus.
+   * @return FileStatus
+   */
+  def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, 
FileStatus]): FileStatus = {
+    val stat = statCache.get(uri) match {
+      case Some(existstat) => existstat
+      case None => 
+        val newStat = fs.getFileStatus(new Path(uri))
+        statCache.put(uri, newStat)
+        newStat
+    }
+    stat
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
new file mode 100644
index 0000000..fdd3c23
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+
+import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.network.util.JavaUtils
+
+
+class ExecutorRunnable(
+    container: Container,
+    conf: Configuration,
+    spConf: SparkConf,
+    masterAddress: String,
+    slaveId: String,
+    hostname: String,
+    executorMemory: Int,
+    executorCores: Int,
+    appId: String,
+    securityMgr: SecurityManager)
+  extends Runnable with ExecutorRunnableUtil with Logging {
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  var nmClient: NMClient = _
+  val sparkConf = spConf
+  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+  def run = {
+    logInfo("Starting Executor Container")
+    nmClient = NMClient.createNMClient()
+    nmClient.init(yarnConf)
+    nmClient.start()
+    startContainer
+  }
+
+  def startContainer = {
+    logInfo("Setting up ContainerLaunchContext")
+
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+
+    val localResources = prepareLocalResources
+    ctx.setLocalResources(localResources)
+
+    ctx.setEnvironment(env)
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
+
+    val commands = prepareCommand(masterAddress, slaveId, hostname, 
executorMemory, executorCores,
+      appId, localResources)
+
+    logInfo(s"Setting up executor with environment: $env")
+    logInfo("Setting up executor with commands: " + commands)
+    ctx.setCommands(commands)
+
+    
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+
+    // If external shuffle service is enabled, register with the Yarn shuffle 
service already
+    // started on the NodeManager and, if authentication is enabled, provide 
it with our secret
+    // key for fetching shuffle files later
+    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+      val secretString = securityMgr.getSecretKey()
+      val secretBytes =
+        if (secretString != null) {
+          // This conversion must match how the YarnShuffleService decodes our 
secret
+          JavaUtils.stringToBytes(secretString)
+        } else {
+          // Authentication is not enabled, so just provide dummy metadata
+          ByteBuffer.allocate(0)
+        }
+      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> 
secretBytes))
+    }
+
+    // Send the start request to the ContainerManager
+    nmClient.startContainer(container, ctx)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
new file mode 100644
index 0000000..22d73ec
--- /dev/null
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.Utils
+
+trait ExecutorRunnableUtil extends Logging {
+
+  val yarnConf: YarnConfiguration
+  val sparkConf: SparkConf
+  lazy val env = prepareEnvironment
+
+  def prepareCommand(
+      masterAddress: String,
+      slaveId: String,
+      hostname: String,
+      executorMemory: Int,
+      executorCores: Int,
+      appId: String,
+      localResources: HashMap[String, LocalResource]): List[String] = {
+    // Extra options for the JVM
+    val javaOpts = ListBuffer[String]()
+
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    var prefixEnv: Option[String] = None
+
+    // Set the JVM memory
+    val executorMemoryString = executorMemory + "m"
+    javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString 
+ " "
+
+    // Set extra Java options for the executor, if defined
+    sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
+      javaOpts += opts
+    }
+    sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
+      javaOpts += opts
+    }
+    sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
+      prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+    }
+
+    javaOpts += "-Djava.io.tmpdir=" +
+      new Path(Environment.PWD.$(), 
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+
+    // Certain configs need to be passed here because they are needed before 
the Executor
+    // registers with the Scheduler and transfers the spark configs. Since the 
Executor backend
+    // uses Akka to connect to the scheduler, the akka settings are needed as 
well as the
+    // authentication settings.
+    sparkConf.getAll.
+      filter { case (k, v) => k.startsWith("spark.auth") || 
k.startsWith("spark.akka") }.
+      foreach { case (k, v) => javaOpts += 
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+
+    sparkConf.getAkkaConf.
+      foreach { case (k, v) => javaOpts += 
YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+
+    // Commenting it out for now - so that people can refer to the properties 
if required. Remove
+    // it once cpuset version is pushed out.
+    // The context is, default gc for server class machines end up using all 
cores to do gc - hence
+    // if there are multiple containers in same node, spark gc effects all 
other containers
+    // performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 
'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it 
is limited to subset
+    // of cores on a node.
+    /*
+        else {
+          // If no java_opts specified, default to using 
-XX:+CMSIncrementalMode
+          // It might be possible that other modes/config is being done in
+          // spark.executor.extraJavaOptions, so we dont want to mess with it.
+          // In our expts, using (default) throughput collector has severe 
perf ramnifications in
+          // multi-tennent machines
+          // The options are based on
+          // 
http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use
+          // %20the%20Concurrent%20Low%20Pause%20Collector|outline
+          javaOpts += " -XX:+UseConcMarkSweepGC "
+          javaOpts += " -XX:+CMSIncrementalMode "
+          javaOpts += " -XX:+CMSIncrementalPacing "
+          javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
+          javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
+        }
+    */
+
+    // For log4j configuration to reference
+    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+
+    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
+      "-server",
+      // Kill if OOM is raised - leverage yarn's failure handling to cause 
rescheduling.
+      // Not killing the task leaves various aspects of the executor and (to 
some extent) the jvm in
+      // an inconsistent state.
+      // TODO: If the OOM is not recoverable by rescheduling it on different 
node, then do
+      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+      "-XX:OnOutOfMemoryError='kill %p'") ++
+      javaOpts ++
+      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+      masterAddress.toString,
+      slaveId.toString,
+      hostname.toString,
+      executorCores.toString,
+      appId,
+      "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+      "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    // TODO: it would be nicer to just make sure there are no null commands 
here
+    commands.map(s => if (s == null) "null" else s).toList
+  }
+
+  private def setupDistributedCache(
+      file: String,
+      rtype: LocalResourceType,
+      localResources: HashMap[String, LocalResource],
+      timestamp: String,
+      size: String,
+      vis: String): Unit = {
+    val uri = new URI(file)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource])
+    amJarRsrc.setType(rtype)
+    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+    amJarRsrc.setTimestamp(timestamp.toLong)
+    amJarRsrc.setSize(size.toLong)
+    localResources(uri.getFragment()) = amJarRsrc
+  }
+
+  def prepareLocalResources: HashMap[String, LocalResource] = {
+    logInfo("Preparing Local resources")
+    val localResources = HashMap[String, LocalResource]()
+
+    if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+      val timeStamps = 
System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+      val fileSizes = 
System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+      val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+      val visibilities = 
System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+      for( i <- 0 to distFiles.length - 1) {
+        setupDistributedCache(distFiles(i), LocalResourceType.FILE, 
localResources, timeStamps(i),
+          fileSizes(i), visibilities(i))
+      }
+    }
+
+    if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+      val timeStamps = 
System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+      val fileSizes = 
System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+      val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+      val visibilities = 
System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+      for( i <- 0 to distArchives.length - 1) {
+        setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, 
localResources,
+          timeStamps(i), fileSizes(i), visibilities(i))
+      }
+    }
+
+    logInfo("Prepared Local resources " + localResources)
+    localResources
+  }
+
+  def prepareEnvironment: HashMap[String, String] = {
+    val env = new HashMap[String, String]()
+    val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
+    ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
+
+    sparkConf.getExecutorEnv.foreach { case (key, value) =>
+      // This assumes each executor environment variable set here is a path
+      // This is kept for backward compatibility and consistency with hadoop
+      YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
+    }
+
+    // Keep this for backwards compatibility but users should move to the 
config
+    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+    }
+
+    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => 
env(k) = v }
+    env
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
new file mode 100644
index 0000000..2bbf5d7
--- /dev/null
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+import org.apache.spark.{SecurityManager, SparkConf} 
+import org.apache.spark.scheduler.SplitInfo
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.util.Records
+
+/**
+ * Acquires resources for executors from a ResourceManager and launches 
executors in new containers.
+ */
+private[yarn] class YarnAllocationHandler(
+    conf: Configuration,
+    sparkConf: SparkConf,
+    amClient: AMRMClient[ContainerRequest],
+    appAttemptId: ApplicationAttemptId,
+    args: ApplicationMasterArguments,
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]], 
+    securityMgr: SecurityManager)
+  extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, 
securityMgr) {
+
+  override protected def releaseContainer(container: Container) = {
+    amClient.releaseAssignedContainer(container.getId())
+  }
+
+  // pending isn't used on stable as the AMRMClient handles incremental asks
+  override protected def allocateContainers(count: Int, pending: Int): 
YarnAllocateResponse = {
+    addResourceRequests(count)
+
+    // We have already set the container request. Poll the ResourceManager for 
a response.
+    // This doubles as a heartbeat if there are no pending container requests.
+    val progressIndicator = 0.1f
+    new StableAllocateResponse(amClient.allocate(progressIndicator))
+  }
+
+  private def createRackResourceRequests(
+      hostContainers: ArrayBuffer[ContainerRequest]
+    ): ArrayBuffer[ContainerRequest] = {
+    // Generate modified racks and new set of hosts under it before issuing 
requests.
+    val rackToCounts = new HashMap[String, Int]()
+
+    for (container <- hostContainers) {
+      val candidateHost = container.getNodes.last
+      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
+
+      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+      if (rack != null) {
+        var count = rackToCounts.getOrElse(rack, 0)
+        count += 1
+        rackToCounts.put(rack, count)
+      }
+    }
+
+    val requestedContainers = new 
ArrayBuffer[ContainerRequest](rackToCounts.size)
+    for ((rack, count) <- rackToCounts) {
+      requestedContainers ++= createResourceRequests(
+        AllocationType.RACK,
+        rack,
+        count,
+        YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+    }
+
+    requestedContainers
+  }
+
+  private def addResourceRequests(numExecutors: Int) {
+    val containerRequests: List[ContainerRequest] =
+      if (numExecutors <= 0) {
+        logDebug("numExecutors: " + numExecutors)
+        List()
+      } else if (preferredHostToCount.isEmpty) {
+        logDebug("host preferences is empty")
+        createResourceRequests(
+          AllocationType.ANY,
+          resource = null,
+          numExecutors,
+          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList
+      } else {
+        // Request for all hosts in preferred nodes and for numExecutors -
+        // candidates.size, request by default allocation policy.
+        val hostContainerRequests = new 
ArrayBuffer[ContainerRequest](preferredHostToCount.size)
+        for ((candidateHost, candidateCount) <- preferredHostToCount) {
+          val requiredCount = candidateCount - 
allocatedContainersOnHost(candidateHost)
+
+          if (requiredCount > 0) {
+            hostContainerRequests ++= createResourceRequests(
+              AllocationType.HOST,
+              candidateHost,
+              requiredCount,
+              YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+          }
+        }
+        val rackContainerRequests: List[ContainerRequest] = 
createRackResourceRequests(
+          hostContainerRequests).toList
+
+        val anyContainerRequests = createResourceRequests(
+          AllocationType.ANY,
+          resource = null,
+          numExecutors,
+          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+
+        val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
+          hostContainerRequests.size + rackContainerRequests.size() + 
anyContainerRequests.size)
+
+        containerRequestBuffer ++= hostContainerRequests
+        containerRequestBuffer ++= rackContainerRequests
+        containerRequestBuffer ++= anyContainerRequests
+        containerRequestBuffer.toList
+      }
+
+    for (request <- containerRequests) {
+      amClient.addContainerRequest(request)
+    }
+
+    for (request <- containerRequests) {
+      val nodes = request.getNodes
+      var hostStr = if (nodes == null || nodes.isEmpty) {
+        "Any"
+      } else {
+        nodes.last
+      }
+      logInfo("Container request (host: %s, priority: %s, capability: 
%s".format(
+        hostStr,
+        request.getPriority().getPriority,
+        request.getCapability))
+    }
+  }
+
+  private def createResourceRequests(
+      requestType: AllocationType.AllocationType,
+      resource: String,
+      numExecutors: Int,
+      priority: Int
+    ): ArrayBuffer[ContainerRequest] = {
+
+    // If hostname is specified, then we need at least two requests - node 
local and rack local.
+    // There must be a third request, which is ANY. That will be specially 
handled.
+    requestType match {
+      case AllocationType.HOST => {
+        assert(YarnSparkHadoopUtil.ANY_HOST != resource)
+        val hostname = resource
+        val nodeLocal = constructContainerRequests(
+          Array(hostname),
+          racks = null,
+          numExecutors,
+          priority)
+
+        // Add `hostname` to the global (singleton) host->rack mapping in 
YarnAllocationHandler.
+        YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
+        nodeLocal
+      }
+      case AllocationType.RACK => {
+        val rack = resource
+        constructContainerRequests(hosts = null, Array(rack), numExecutors, 
priority)
+      }
+      case AllocationType.ANY => constructContainerRequests(
+        hosts = null, racks = null, numExecutors, priority)
+      case _ => throw new IllegalArgumentException(
+        "Unexpected/unsupported request type: " + requestType)
+    }
+  }
+
+  private def constructContainerRequests(
+      hosts: Array[String],
+      racks: Array[String],
+      numExecutors: Int,
+      priority: Int
+    ): ArrayBuffer[ContainerRequest] = {
+
+    val memoryRequest = executorMemory + memoryOverhead
+    val resource = Resource.newInstance(memoryRequest, executorCores)
+
+    val prioritySetting = Records.newRecord(classOf[Priority])
+    prioritySetting.setPriority(priority)
+
+    val requests = new ArrayBuffer[ContainerRequest]()
+    for (i <- 0 until numExecutors) {
+      requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
+    }
+    requests
+  }
+
+  private class StableAllocateResponse(response: AllocateResponse) extends 
YarnAllocateResponse {
+    override def getAllocatedContainers() = response.getAllocatedContainers()
+    override def getAvailableResources() = response.getAvailableResources()
+    override def getCompletedContainersStatuses() = 
response.getCompletedContainersStatuses()
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to