Repository: spark
Updated Branches:
  refs/heads/master 2104931d7 -> ab6259566


[SPARK-4352] [YARN] [WIP] Incorporate locality preferences in dynamic 
allocation requests

Currently there's no locality preference for container request in YARN mode, 
this will affect the performance if fetching data remotely, so here proposed to 
add locality in Yarn dynamic allocation mode.

Ping sryza, please help to review, thanks a lot.

Author: jerryshao <[email protected]>

Closes #6394 from jerryshao/SPARK-4352 and squashes the following commits:

d45fecb [jerryshao] Add documents
6c3fe5c [jerryshao] Fix bug
8db6c0e [jerryshao] Further address the comments
2e2b2cb [jerryshao] Fix rebase compiling problem
ce5f096 [jerryshao] Fix style issue
7f7df95 [jerryshao] Fix rebase issue
9ca9e07 [jerryshao] Code refactor according to comments
d3e4236 [jerryshao] Further address the comments
5e7a593 [jerryshao] Fix bug introduced code rebase
9ca7783 [jerryshao] Style changes
08317f9 [jerryshao] code and comment refines
65b2423 [jerryshao] Further address the comments
a27c587 [jerryshao] address the comment
27faabc [jerryshao] redundant code remove
9ce06a1 [jerryshao] refactor the code
f5ba27b [jerryshao] Style fix
2c6cc8a [jerryshao] Fix bug and add unit tests
0757335 [jerryshao] Consider the distribution of existed containers to 
recalculate the new container requests
0ad66ff [jerryshao] Fix compile bugs
1c20381 [jerryshao] Minor fix
5ef2dc8 [jerryshao] Add docs and improve the code
3359814 [jerryshao] Fix rebase and test bugs
0398539 [jerryshao] reinitialize the new implementation
67596d6 [jerryshao] Still fix the code
654e1d2 [jerryshao] Fix some bugs
45b1c89 [jerryshao] Further polish the algorithm
dea0152 [jerryshao] Enable node locality information in YarnAllocator
74bbcc6 [jerryshao] Support node locality for dynamic allocation initial commit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab625956
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab625956
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab625956

Branch: refs/heads/master
Commit: ab625956616664c2b4861781a578311da75a9ae4
Parents: 2104931
Author: jerryshao <[email protected]>
Authored: Mon Jul 27 15:46:35 2015 -0700
Committer: Sandy Ryza <[email protected]>
Committed: Mon Jul 27 15:46:35 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/ExecutorAllocationClient.scala |  18 +-
 .../spark/ExecutorAllocationManager.scala       |  62 ++++++-
 .../scala/org/apache/spark/SparkContext.scala   |  25 ++-
 .../apache/spark/scheduler/DAGScheduler.scala   |  26 ++-
 .../org/apache/spark/scheduler/Stage.scala      |   7 +-
 .../org/apache/spark/scheduler/StageInfo.scala  |  13 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   6 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  32 +++-
 .../cluster/YarnSchedulerBackend.scala          |   3 +-
 .../spark/ExecutorAllocationManagerSuite.scala  |  55 +++++-
 .../apache/spark/HeartbeatReceiverSuite.scala   |   7 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   5 +-
 ...ityPreferredContainerPlacementStrategy.scala | 182 +++++++++++++++++++
 .../spark/deploy/yarn/YarnAllocator.scala       |  47 ++++-
 .../yarn/ContainerPlacementStrategySuite.scala  | 125 +++++++++++++
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  |  14 +-
 16 files changed, 578 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 443830f..842bfdb 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -24,11 +24,23 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
-   * Express a preference to the cluster manager for a given total number of 
executors.
-   * This can result in canceling pending requests or filing additional 
requests.
+   * Update the cluster manager on our scheduling needs. Three bits of 
information are included
+   * to help it make decisions.
+   * @param numExecutors The total number of executors we'd like to have. The 
cluster manager
+   *                     shouldn't kill any running executor to reach this 
number, but,
+   *                     if all existing executors were to die, this is the 
number of executors
+   *                     we'd want to be allocated.
+   * @param localityAwareTasks The number of tasks in all active stages that 
have a locality
+   *                           preferences. This includes running, pending, 
and completed tasks.
+   * @param hostToLocalTaskCount A map of hosts to the number of tasks from 
all active stages
+   *                             that would like to like to run on that host.
+   *                             This includes running, pending, and completed 
tasks.
    * @return whether the request is acknowledged by the cluster manager.
    */
-  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+  private[spark] def requestTotalExecutors(
+      numExecutors: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]): Boolean
 
   /**
    * Request an additional number of executors from the cluster manager.

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 648bcfe..1877aaf 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -161,6 +161,12 @@ private[spark] class ExecutorAllocationManager(
   //   (2) an executor idle timeout has elapsed.
   @volatile private var initializing: Boolean = true
 
+  // Number of locality aware tasks, used for executor placement.
+  private var localityAwareTasks = 0
+
+  // Host to possible task running on it, used for executor placement.
+  private var hostToLocalTaskCount: Map[String, Int] = Map.empty
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -295,7 +301,7 @@ private[spark] class ExecutorAllocationManager(
 
       // If the new target has not changed, avoid sending a message to the 
cluster manager
       if (numExecutorsTarget < oldNumExecutorsTarget) {
-        client.requestTotalExecutors(numExecutorsTarget)
+        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
         logDebug(s"Lowering target number of executors to $numExecutorsTarget 
(previously " +
           s"$oldNumExecutorsTarget) because not all requested executors are 
actually needed")
       }
@@ -349,7 +355,8 @@ private[spark] class ExecutorAllocationManager(
       return 0
     }
 
-    val addRequestAcknowledged = testing || 
client.requestTotalExecutors(numExecutorsTarget)
+    val addRequestAcknowledged = testing ||
+      client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
     if (addRequestAcknowledged) {
       val executorsString = "executor" + { if (delta > 1) "s" else "" }
       logInfo(s"Requesting $delta new $executorsString because tasks are 
backlogged" +
@@ -519,6 +526,12 @@ private[spark] class ExecutorAllocationManager(
     // Number of tasks currently running on the cluster.  Should be 0 when no 
stages are active.
     private var numRunningTasks: Int = _
 
+    // stageId to tuple (the number of task with locality preferences, a map 
where each pair is a
+    // node and the number of tasks that would like to be scheduled on that 
node) map,
+    // maintain the executor placement hints for each stage Id used by 
resource framework to better
+    // place the executors.
+    private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
+
     override def onStageSubmitted(stageSubmitted: 
SparkListenerStageSubmitted): Unit = {
       initializing = false
       val stageId = stageSubmitted.stageInfo.stageId
@@ -526,6 +539,24 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         stageIdToNumTasks(stageId) = numTasks
         allocationManager.onSchedulerBacklogged()
+
+        // Compute the number of tasks requested by the stage on each host
+        var numTasksPending = 0
+        val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
+        stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
+          if (!locality.isEmpty) {
+            numTasksPending += 1
+            locality.foreach { location =>
+              val count = 
hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
+              hostToLocalTaskCountPerStage(location.host) = count
+            }
+          }
+        }
+        stageIdToExecutorPlacementHints.put(stageId,
+          (numTasksPending, hostToLocalTaskCountPerStage.toMap))
+
+        // Update the executor placement hints
+        updateExecutorPlacementHints()
       }
     }
 
@@ -534,6 +565,10 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         stageIdToNumTasks -= stageId
         stageIdToTaskIndices -= stageId
+        stageIdToExecutorPlacementHints -= stageId
+
+        // Update the executor placement hints
+        updateExecutorPlacementHints()
 
         // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
         // This is needed in case the stage is aborted for any reason
@@ -637,6 +672,29 @@ private[spark] class ExecutorAllocationManager(
     def isExecutorIdle(executorId: String): Boolean = {
       !executorIdToTaskIds.contains(executorId)
     }
+
+    /**
+     * Update the Executor placement hints (the number of tasks with locality 
preferences,
+     * a map where each pair is a node and the number of tasks that would like 
to be scheduled
+     * on that node).
+     *
+     * These hints are updated when stages arrive and complete, so are not 
up-to-date at task
+     * granularity within stages.
+     */
+    def updateExecutorPlacementHints(): Unit = {
+      var localityAwareTasks = 0
+      val localityToCount = new mutable.HashMap[String, Int]()
+      stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, 
localities) =>
+        localityAwareTasks += numTasksPending
+        localities.foreach { case (hostname, count) =>
+          val updatedCount = localityToCount.getOrElse(hostname, 0) + count
+          localityToCount(hostname) = updatedCount
+        }
+      }
+
+      allocationManager.localityAwareTasks = localityAwareTasks
+      allocationManager.hostToLocalTaskCount = localityToCount.toMap
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6a6b94a..ac6ac6c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1382,16 +1382,29 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /**
-   * Express a preference to the cluster manager for a given total number of 
executors.
-   * This can result in canceling pending requests or filing additional 
requests.
-   * This is currently only supported in YARN mode. Return whether the request 
is received.
-   */
-  private[spark] override def requestTotalExecutors(numExecutors: Int): 
Boolean = {
+   * Update the cluster manager on our scheduling needs. Three bits of 
information are included
+   * to help it make decisions.
+   * @param numExecutors The total number of executors we'd like to have. The 
cluster manager
+   *                     shouldn't kill any running executor to reach this 
number, but,
+   *                     if all existing executors were to die, this is the 
number of executors
+   *                     we'd want to be allocated.
+   * @param localityAwareTasks The number of tasks in all active stages that 
have a locality
+   *                           preferences. This includes running, pending, 
and completed tasks.
+   * @param hostToLocalTaskCount A map of hosts to the number of tasks from 
all active stages
+   *                             that would like to like to run on that host.
+   *                             This includes running, pending, and completed 
tasks.
+   * @return whether the request is acknowledged by the cluster manager.
+   */
+  private[spark] override def requestTotalExecutors(
+      numExecutors: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
+    ): Boolean = {
     assert(supportDynamicAllocation,
       "Requesting executors is currently only supported in YARN and Mesos 
modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.requestTotalExecutors(numExecutors)
+        b.requestTotalExecutors(numExecutors, localityAwareTasks, 
hostToLocalTaskCount)
       case _ =>
         logWarning("Requesting executors is only supported in coarse-grained 
mode")
         false

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b6a833b..cdf6078 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -790,8 +790,28 @@ class DAGScheduler(
     // serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
     // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
     // event.
-    stage.makeNewStageAttempt(partitionsToCompute.size)
     outputCommitCoordinator.stageStart(stage.id)
+    val taskIdToLocations = try {
+      stage match {
+        case s: ShuffleMapStage =>
+          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, 
id))}.toMap
+        case s: ResultStage =>
+          val job = s.resultOfJob.get
+          partitionsToCompute.map { id =>
+            val p = job.partitions(id)
+            (id, getPreferredLocs(stage.rdd, p))
+          }.toMap
+      }
+    } catch {
+      case NonFatal(e) =>
+        stage.makeNewStageAttempt(partitionsToCompute.size)
+        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, 
properties))
+        abortStage(stage, s"Task creation failed: 
$e\n${e.getStackTraceString}")
+        runningStages -= stage
+        return
+    }
+
+    stage.makeNewStageAttempt(partitionsToCompute.size, 
taskIdToLocations.values.toSeq)
     listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
 
     // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it 
multiple times.
@@ -830,7 +850,7 @@ class DAGScheduler(
       stage match {
         case stage: ShuffleMapStage =>
           partitionsToCompute.map { id =>
-            val locs = getPreferredLocs(stage.rdd, id)
+            val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, 
taskBinary, part, locs)
           }
@@ -840,7 +860,7 @@ class DAGScheduler(
           partitionsToCompute.map { id =>
             val p: Int = job.partitions(id)
             val part = stage.rdd.partitions(p)
-            val locs = getPreferredLocs(stage.rdd, p)
+            val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, 
part, locs, id)
           }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index b86724d..40a333a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -77,8 +77,11 @@ private[spark] abstract class Stage(
   private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
 
   /** Creates a new attempt for this stage by creating a new StageInfo with a 
new attempt ID. */
-  def makeNewStageAttempt(numPartitionsToCompute: Int): Unit = {
-    _latestInfo = StageInfo.fromStage(this, nextAttemptId, 
Some(numPartitionsToCompute))
+  def makeNewStageAttempt(
+      numPartitionsToCompute: Int,
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
+    _latestInfo = StageInfo.fromStage(
+      this, nextAttemptId, Some(numPartitionsToCompute), 
taskLocalityPreferences)
     nextAttemptId += 1
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 5d2abbc..24796c1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -34,7 +34,8 @@ class StageInfo(
     val numTasks: Int,
     val rddInfos: Seq[RDDInfo],
     val parentIds: Seq[Int],
-    val details: String) {
+    val details: String,
+    private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = 
Seq.empty) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. 
*/
   var submissionTime: Option[Long] = None
   /** Time when all tasks in the stage completed or when the stage was 
cancelled. */
@@ -70,7 +71,12 @@ private[spark] object StageInfo {
    * shuffle dependencies. Therefore, all ancestor RDDs related to this 
Stage's RDD through a
    * sequence of narrow dependencies should also be associated with this Stage.
    */
-  def fromStage(stage: Stage, attemptId: Int, numTasks: Option[Int] = None): 
StageInfo = {
+  def fromStage(
+      stage: Stage,
+      attemptId: Int,
+      numTasks: Option[Int] = None,
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
+    ): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
     val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
     new StageInfo(
@@ -80,6 +86,7 @@ private[spark] object StageInfo {
       numTasks.getOrElse(stage.numTasks),
       rddInfos,
       stage.parents.map(_.id),
-      stage.details)
+      stage.details,
+      taskLocalityPreferences)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4be1eda..06f5438 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -86,7 +86,11 @@ private[spark] object CoarseGrainedClusterMessages {
 
   // Request executors by specifying the new total number of executors desired
   // This includes executors already pending or running
-  case class RequestExecutors(requestedTotal: Int) extends 
CoarseGrainedClusterMessage
+  case class RequestExecutors(
+      requestedTotal: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int])
+    extends CoarseGrainedClusterMessage
 
   case class KillExecutors(executorIds: Seq[String]) extends 
CoarseGrainedClusterMessage
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c65b3e5..660702f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -66,6 +66,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // Executors we have requested the cluster manager to kill that have not 
died yet
   private val executorsPendingToRemove = new HashSet[String]
 
+  // A map to store hostname with its possible task number running on it
+  protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
+
+  // The number of pending tasks which is locality required
+  protected var localityAwareTasks = 0
+
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
@@ -339,6 +345,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
     logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from 
the cluster manager")
     logDebug(s"Number of pending executors is now $numPendingExecutors")
+
     numPendingExecutors += numAdditionalExecutors
     // Account for executors pending to be added or removed
     val newTotal = numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size
@@ -346,16 +353,33 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   /**
-   * Express a preference to the cluster manager for a given total number of 
executors. This can
-   * result in canceling pending requests or filing additional requests.
-   * @return whether the request is acknowledged.
+   * Update the cluster manager on our scheduling needs. Three bits of 
information are included
+   * to help it make decisions.
+   * @param numExecutors The total number of executors we'd like to have. The 
cluster manager
+   *                     shouldn't kill any running executor to reach this 
number, but,
+   *                     if all existing executors were to die, this is the 
number of executors
+   *                     we'd want to be allocated.
+   * @param localityAwareTasks The number of tasks in all active stages that 
have a locality
+   *                           preferences. This includes running, pending, 
and completed tasks.
+   * @param hostToLocalTaskCount A map of hosts to the number of tasks from 
all active stages
+   *                             that would like to like to run on that host.
+   *                             This includes running, pending, and completed 
tasks.
+   * @return whether the request is acknowledged by the cluster manager.
    */
-  final override def requestTotalExecutors(numExecutors: Int): Boolean = 
synchronized {
+  final override def requestTotalExecutors(
+      numExecutors: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]
+    ): Boolean = synchronized {
     if (numExecutors < 0) {
       throw new IllegalArgumentException(
         "Attempted to request a negative number of executor(s) " +
           s"$numExecutors from the cluster manager. Please specify a positive 
number!")
     }
+
+    this.localityAwareTasks = localityAwareTasks
+    this.hostToLocalTaskCount = hostToLocalTaskCount
+
     numPendingExecutors =
       math.max(numExecutors - numExistingExecutors + 
executorsPendingToRemove.size, 0)
     doRequestTotalExecutors(numExecutors)

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index bc67abb..074282d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -53,7 +53,8 @@ private[spark] abstract class YarnSchedulerBackend(
    * This includes executors already pending or running.
    */
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    
yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
+    yarnSchedulerEndpoint.askWithRetry[Boolean](
+      RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 803e183..34caca8 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -751,6 +751,42 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 2)
   }
 
+  test("get pending task number and related locality preference") {
+    sc = createSparkContext(2, 5, 3)
+    val manager = sc.executorAllocationManager.get
+
+    val localityPreferences1 = Seq(
+      Seq(TaskLocation("host1"), TaskLocation("host2"), TaskLocation("host3")),
+      Seq(TaskLocation("host1"), TaskLocation("host2"), TaskLocation("host4")),
+      Seq(TaskLocation("host2"), TaskLocation("host3"), TaskLocation("host4")),
+      Seq.empty,
+      Seq.empty
+    )
+    val stageInfo1 = createStageInfo(1, 5, localityPreferences1)
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1))
+
+    assert(localityAwareTasks(manager) === 3)
+    assert(hostToLocalTaskCount(manager) ===
+      Map("host1" -> 2, "host2" -> 3, "host3" -> 2, "host4" -> 2))
+
+    val localityPreferences2 = Seq(
+      Seq(TaskLocation("host2"), TaskLocation("host3"), TaskLocation("host5")),
+      Seq(TaskLocation("host3"), TaskLocation("host4"), TaskLocation("host5")),
+      Seq.empty
+    )
+    val stageInfo2 = createStageInfo(2, 3, localityPreferences2)
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2))
+
+    assert(localityAwareTasks(manager) === 5)
+    assert(hostToLocalTaskCount(manager) ===
+      Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 
2))
+
+    sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1))
+    assert(localityAwareTasks(manager) === 2)
+    assert(hostToLocalTaskCount(manager) ===
+      Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
@@ -784,8 +820,13 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
   private val sustainedSchedulerBacklogTimeout = 2L
   private val executorIdleTimeout = 3L
 
-  private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
-    new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no 
details")
+  private def createStageInfo(
+      stageId: Int,
+      numTasks: Int,
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
+    ): StageInfo = {
+    new StageInfo(
+      stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", 
taskLocalityPreferences)
   }
 
   private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): 
TaskInfo = {
@@ -815,6 +856,8 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
   private val _onSchedulerQueueEmpty = 
PrivateMethod[Unit]('onSchedulerQueueEmpty)
   private val _onExecutorIdle = PrivateMethod[Unit]('onExecutorIdle)
   private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
+  private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
+  private val _hostToLocalTaskCount = PrivateMethod[Map[String, 
Int]]('hostToLocalTaskCount)
 
   private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
     manager invokePrivate _numExecutorsToAdd()
@@ -885,4 +928,12 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
   private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): 
Unit = {
     manager invokePrivate _onExecutorBusy(id)
   }
+
+  private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _localityAwareTasks()
+  }
+
+  private def hostToLocalTaskCount(manager: ExecutorAllocationManager): 
Map[String, Int] = {
+    manager invokePrivate _hostToLocalTaskCount()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 5a2670e..139b8dc 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -182,7 +182,7 @@ class HeartbeatReceiverSuite
 
     // Adjust the target number of executors on the cluster manager side
     assert(fakeClusterManager.getTargetNumExecutors === 0)
-    sc.requestTotalExecutors(2)
+    sc.requestTotalExecutors(2, 0, Map.empty)
     assert(fakeClusterManager.getTargetNumExecutors === 2)
     assert(fakeClusterManager.getExecutorIdsToKill.isEmpty)
 
@@ -241,7 +241,8 @@ private class FakeSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
   protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean 
= {
-    
clusterManagerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
+    clusterManagerEndpoint.askWithRetry[Boolean](
+      RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount))
   }
 
   protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
@@ -260,7 +261,7 @@ private class FakeClusterManager(override val rpcEnv: 
RpcEnv) extends RpcEndpoin
   def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
-    case RequestExecutors(requestedTotal) =>
+    case RequestExecutors(requestedTotal, _, _) =>
       targetNumExecutors = requestedTotal
       context.reply(true)
     case KillExecutors(executorIds) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 83dafa4..44acc73 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -555,11 +555,12 @@ private[spark] class ApplicationMaster(
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
-      case RequestExecutors(requestedTotal) =>
+      case RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount) =>
         Option(allocator) match {
           case Some(a) =>
             allocatorLock.synchronized {
-              if (a.requestTotalExecutors(requestedTotal)) {
+              if 
(a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+                localityAwareTasks, hostToLocalTaskCount)) {
                 allocatorLock.notifyAll()
               }
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
new file mode 100644
index 0000000..0817802
--- /dev/null
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], 
racks: Array[String])
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers and 
and locality of current
+ * existing containers. The target of this algorithm is to maximize the number 
of tasks that
+ * would run locally.
+ *
+ * Consider a situation in which we have 20 tasks that require (host1, host2, 
host3)
+ * and 10 tasks that require (host1, host2, host4), besides each container has 
2 cores
+ * and cpus per task is 1, so the required container number is 15,
+ * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required container 
number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the required 
container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 
: 1)
+ *
+ * 3. If containers exist but none of them can match the requested localities,
+ * follow the method of 1 and 2.
+ *
+ * 4. If containers exist and some of them can match the requested localities.
+ * For example if we have 1 containers on each node (host1: 1, host2: 1: 
host3: 1, host4: 1),
+ * and the expected containers on each node would be (host1: 5, host2: 5, 
host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to (host1: 
4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers exist and existing localities can fully cover the 
requested localities.
+ * For example if we have 5 containers on each node (host1: 5, host2: 5, 
host3: 5, host4: 5),
+ * which could cover the current requested localities. This algorithm will 
allocate all the
+ * requested containers with no localities.
+ */
+private[yarn] class LocalityPreferredContainerPlacementStrategy(
+    val sparkConf: SparkConf,
+    val yarnConf: Configuration,
+    val resource: Resource) {
+
+  // Number of CPUs per task
+  private val CPUS_PER_TASK = sparkConf.getInt("spark.task.cpus", 1)
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwareTasks number of locality required tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   *                             numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an array of 
string,
+   *         the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+      numContainer: Int,
+      numLocalityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int],
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+    ): Array[ContainerLocalityPreferences] = {
+    val updatedHostToContainerCount = expectedHostToContainerCount(
+      numLocalityAwareTasks, hostToLocalTaskCount, 
allocatedHostToContainersMap)
+    val updatedLocalityAwareContainerNum = 
updatedHostToContainerCount.values.sum
+
+    // The number of containers to allocate, divided into two groups, one with 
preferred locality,
+    // and the other without locality preference.
+    val requiredLocalityFreeContainerNum =
+      math.max(0, numContainer - updatedLocalityAwareContainerNum)
+    val requiredLocalityAwareContainerNum = numContainer - 
requiredLocalityFreeContainerNum
+
+    val containerLocalityPreferences = 
ArrayBuffer[ContainerLocalityPreferences]()
+    if (requiredLocalityFreeContainerNum > 0) {
+      for (i <- 0 until requiredLocalityFreeContainerNum) {
+        containerLocalityPreferences += ContainerLocalityPreferences(
+          null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
+      }
+    }
+
+    if (requiredLocalityAwareContainerNum > 0) {
+      val largestRatio = updatedHostToContainerCount.values.max
+      // Round the ratio of preferred locality to the number of locality 
required container
+      // number, which is used for locality preferred host calculating.
+      var preferredLocalityRatio = updatedHostToContainerCount.mapValues { 
ratio =>
+        val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum 
/ largestRatio
+        adjustedRatio.ceil.toInt
+      }
+
+      for (i <- 0 until requiredLocalityAwareContainerNum) {
+        // Only filter out the ratio which is larger than 0, which means the 
current host can
+        // still be allocated with new container request.
+        val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
+        val racks = hosts.map { h =>
+          RackResolver.resolve(yarnConf, h).getNetworkLocation
+        }.toSet
+        containerLocalityPreferences += ContainerLocalityPreferences(hosts, 
racks.toArray)
+
+        // Minus 1 each time when the host is used. When the current ratio is 
0,
+        // which means all the required ratio is satisfied, this host will not 
be allocated again.
+        preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
+      }
+    }
+
+    containerLocalityPreferences.toArray
+  }
+
+  /**
+   * Calculate the number of executors need to satisfy the given number of 
pending tasks.
+   */
+  private def numExecutorsPending(numTasksPending: Int): Int = {
+    val coresPerExecutor = resource.getVirtualCores
+    (numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor
+  }
+
+  /**
+   * Calculate the expected host to number of containers by considering with 
allocated containers.
+   * @param localityAwareTasks number of locality aware tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   *                             numbers running on it, used as hints for 
container allocation
+   * @return a map with hostname as key and required number of containers on 
this host as value
+   */
+  private def expectedHostToContainerCount(
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int],
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+    ): Map[String, Int] = {
+    val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+    hostToLocalTaskCount.map { case (host, count) =>
+      val expectedCount =
+        count.toDouble * numExecutorsPending(localityAwareTasks) / 
totalLocalTaskNum
+      val existedCount = allocatedHostToContainersMap.get(host)
+        .map(_.size)
+        .getOrElse(0)
+
+      // If existing container can not fully satisfy the expected number of 
container,
+      // the required container number is expected count minus existed count. 
Otherwise the
+      // required container number is 0.
+      (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 940873f..6c10339 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -96,7 +96,7 @@ private[yarn] class YarnAllocator(
   // Number of cores per executor.
   protected val executorCores = args.executorCores
   // Resource capability requested for each executors
-  private val resource = Resource.newInstance(executorMemory + memoryOverhead, 
executorCores)
+  private[yarn] val resource = Resource.newInstance(executorMemory + 
memoryOverhead, executorCores)
 
   private val launcherPool = new ThreadPoolExecutor(
     // max pool size of Integer.MAX_VALUE is ignored because we use an 
unbounded queue
@@ -127,6 +127,16 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  // A map to store preferred hostname and possible task numbers running on it.
+  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
+
+  // Number of tasks that have locality preferences in active stages
+  private var numLocalityAwareTasks: Int = 0
+
+  // A container placement strategy based on pending tasks' locality preference
+  private[yarn] val containerPlacementStrategy =
+    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+
   def getNumExecutorsRunning: Int = numExecutorsRunning
 
   def getNumExecutorsFailed: Int = numExecutorsFailed
@@ -146,10 +156,19 @@ private[yarn] class YarnAllocator(
    * Request as many executors from the ResourceManager as needed to reach the 
desired total. If
    * the requested total is smaller than the current number of running 
executors, no executors will
    * be killed.
-   *
+   * @param requestedTotal total number of containers requested
+   * @param localityAwareTasks number of locality aware tasks to be used as 
container placement hint
+   * @param hostToLocalTaskCount a map of preferred hostname to possible task 
counts to be used as
+   *                             container placement hint.
    * @return Whether the new requested total is different than the old value.
    */
-  def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
+  def requestTotalExecutorsWithPreferredLocalities(
+      requestedTotal: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
+    this.numLocalityAwareTasks = localityAwareTasks
+    this.hostToLocalTaskCounts = hostToLocalTaskCount
+
     if (requestedTotal != targetNumExecutors) {
       logInfo(s"Driver requested a total number of $requestedTotal 
executor(s).")
       targetNumExecutors = requestedTotal
@@ -221,12 +240,20 @@ private[yarn] class YarnAllocator(
     val numPendingAllocate = getNumPendingAllocate
     val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
 
+    // TODO. Consider locality preferences of pending container requests.
+    // Since the last time we made container requests, stages have completed 
and been submitted,
+    // and that the localities at which we requested our pending executors
+    // no longer apply to our current needs. We should consider to remove all 
outstanding
+    // container requests and add requests anew each time to avoid this.
     if (missing > 0) {
       logInfo(s"Will request $missing executor containers, each with 
${resource.getVirtualCores} " +
         s"cores and ${resource.getMemory} MB memory including $memoryOverhead 
MB overhead")
 
-      for (i <- 0 until missing) {
-        val request = createContainerRequest(resource)
+      val containerLocalityPreferences = 
containerPlacementStrategy.localityOfRequestedContainers(
+        missing, numLocalityAwareTasks, hostToLocalTaskCounts, 
allocatedHostToContainersMap)
+
+      for (locality <- containerLocalityPreferences) {
+        val request = createContainerRequest(resource, locality.nodes, 
locality.racks)
         amClient.addContainerRequest(request)
         val nodes = request.getNodes
         val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@@ -249,11 +276,14 @@ private[yarn] class YarnAllocator(
    * Creates a container request, handling the reflection required to use YARN 
features that were
    * added in recent versions.
    */
-  private def createContainerRequest(resource: Resource): ContainerRequest = {
+  protected def createContainerRequest(
+      resource: Resource,
+      nodes: Array[String],
+      racks: Array[String]): ContainerRequest = {
     nodeLabelConstructor.map { constructor =>
-      constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: 
java.lang.Boolean,
+      constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, 
true: java.lang.Boolean,
         labelExpression.orNull)
-    }.getOrElse(new ContainerRequest(resource, null, null, 
RM_REQUEST_PRIORITY))
+    }.getOrElse(new ContainerRequest(resource, nodes, racks, 
RM_REQUEST_PRIORITY))
   }
 
   /**
@@ -437,7 +467,6 @@ private[yarn] class YarnAllocator(
     releasedContainers.add(container.getId())
     amClient.releaseAssignedContainer(container.getId())
   }
-
 }
 
 private object YarnAllocator {

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
new file mode 100644
index 0000000..b7fe4cc
--- /dev/null
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
+  import yarnAllocatorSuite._
+
+  override def beforeEach() {
+    yarnAllocatorSuite.beforeEach()
+  }
+
+  override def afterEach() {
+    yarnAllocatorSuite.afterEach()
+  }
+
+  test("allocate locality preferred containers with enough resource and no 
matched existed " +
+    "containers") {
+    // 1. All the locations of current containers cannot satisfy the new 
requirements
+    // 2. Current requested container number can fully satisfy the pending 
tasks.
+
+    val handler = createAllocator(2)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(createContainer("host1"), 
createContainer("host2")))
+
+    val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
+      3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), 
handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(
+      Array("host3", "host4", "host5"),
+      Array("host3", "host4", "host5"),
+      Array("host3", "host4")))
+  }
+
+  test("allocate locality preferred containers with enough resource and 
partially matched " +
+    "containers") {
+    // 1. Parts of current containers' locations can satisfy the new 
requirements
+    // 2. Current requested container number can fully satisfy the pending 
tasks.
+
+    val handler = createAllocator(3)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(
+      createContainer("host1"),
+      createContainer("host1"),
+      createContainer("host2")
+    ))
+
+    val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
+      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), 
handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) ===
+      Array(null, Array("host2", "host3"), Array("host2", "host3")))
+  }
+
+  test("allocate locality preferred containers with limited resource and 
partially matched " +
+    "containers") {
+    // 1. Parts of current containers' locations can satisfy the new 
requirements
+    // 2. Current requested container number cannot fully satisfy the pending 
tasks.
+
+    val handler = createAllocator(3)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(
+      createContainer("host1"),
+      createContainer("host1"),
+      createContainer("host2")
+    ))
+
+    val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
+      1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), 
handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
+  }
+
+  test("allocate locality preferred containers with fully matched containers") 
{
+    // Current containers' locations can fully satisfy the new requirements
+
+    val handler = createAllocator(5)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(
+      createContainer("host1"),
+      createContainer("host1"),
+      createContainer("host2"),
+      createContainer("host2"),
+      createContainer("host3")
+    ))
+
+    val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
+      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), 
handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(null, null, null))
+  }
+
+  test("allocate containers with no locality preference") {
+    // Request new container without locality preference
+
+    val handler = createAllocator(2)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(createContainer("host1"), 
createContainer("host2")))
+
+    val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
+      1, 0, Map.empty, handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(null))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 7509000..37a789f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.scalatest.{BeforeAndAfterEach, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkFunSuite}
 import org.apache.spark.SparkConf
@@ -32,8 +33,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.YarnAllocator._
 import org.apache.spark.scheduler.SplitInfo
 
-import org.scalatest.{BeforeAndAfterEach, Matchers}
-
 class MockResolver extends DNSToSwitchMapping {
 
   override def resolve(names: JList[String]): JList[String] = {
@@ -171,7 +170,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     handler.getNumExecutorsRunning should be (0)
     handler.getNumPendingAllocate should be (4)
 
-    handler.requestTotalExecutors(3)
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (3)
 
@@ -182,7 +181,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
     handler.allocatedHostToContainersMap.get("host1").get should contain 
(container.getId)
 
-    handler.requestTotalExecutors(2)
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (1)
   }
@@ -193,7 +192,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     handler.getNumExecutorsRunning should be (0)
     handler.getNumPendingAllocate should be (4)
 
-    handler.requestTotalExecutors(3)
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (3)
 
@@ -203,7 +202,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 
     handler.getNumExecutorsRunning should be (2)
 
-    handler.requestTotalExecutors(1)
+    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (0)
     handler.getNumExecutorsRunning should be (2)
@@ -219,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
 
-    handler.requestTotalExecutors(1)
+    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
     handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id 
) }
 
     val statuses = Seq(container1, container2).map { c =>
@@ -241,5 +240,4 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
     assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to