Repository: spark
Updated Branches:
  refs/heads/master 5a826c00c -> 63bdb1f41


SPARK-2294: fix locality inversion bug in TaskManager

copied from original JIRA (https://issues.apache.org/jira/browse/SPARK-2294):

If an executor E is free, a task may be speculatively assigned to E when there 
are other tasks in the job that have not been launched (at all) yet. Similarly, 
a task without any locality preferences may be assigned to E when there was 
another NODE_LOCAL task that could have been scheduled.
This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer 
(which in turn calls TaskSetManager.findTask) with increasing locality levels, 
beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the 
highest currently allowed level. Now, supposed NODE_LOCAL is the highest 
currently allowed locality level. The first time findTask is called, it will be 
called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, 
it will try to schedule tasks with no locality preferences or speculative 
tasks. As a result, speculative tasks or tasks with no preferences may be 
scheduled instead of NODE_LOCAL tasks.

----

I added an additional parameter in resourceOffer and findTask, maxLocality, 
indicating when we should consider the tasks without locality preference

Author: CodingCat <zhunans...@gmail.com>

Closes #1313 from CodingCat/SPARK-2294 and squashes the following commits:

bf3f13b [CodingCat] rollback some forgotten changes
89f9bc0 [CodingCat] address matei's comments
18cae02 [CodingCat] add test case for node-local tasks
2ba6195 [CodingCat] fix failed test cases
87dd09e [CodingCat] fix style
9b9432f [CodingCat] remove hasNodeLocalOnlyTasks
fdd1573 [CodingCat] fix failed test cases
941a4fd [CodingCat] see my shocked face..........
f600085 [CodingCat] remove hasNodeLocalOnlyTasks checking
0b8a46b [CodingCat] test whether hasNodeLocalOnlyTasks affect the results
73ceda8 [CodingCat] style fix
b3a430b [CodingCat] remove fine granularity tracking for node-local only tasks
f9a2ad8 [CodingCat] simplify the logic in TaskSchedulerImpl
c8c1de4 [CodingCat] simplify the patch
be652ed [CodingCat] avoid unnecessary delay when we only have nopref tasks
dee9e22 [CodingCat] fix locality inversion bug in TaskManager by moving nopref 
branch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63bdb1f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63bdb1f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63bdb1f4

Branch: refs/heads/master
Commit: 63bdb1f41b4895e3a9444f7938094438a94d3007
Parents: 5a826c0
Author: CodingCat <zhunans...@gmail.com>
Authored: Tue Aug 5 23:02:58 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue Aug 5 23:02:58 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskLocality.scala   |   2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   7 +-
 .../apache/spark/scheduler/TaskSetManager.scala | 109 +++++-----
 .../spark/scheduler/TaskSetManagerSuite.scala   | 205 +++++++++++++------
 4 files changed, 203 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index eb920ab..f176d09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi
 @DeveloperApi
 object TaskLocality extends Enumeration {
   // Process local is expected to be used ONLY within TaskSetManager for now.
-  val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
+  val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
 
   type TaskLocality = Value
 

http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d2f764f..6c0d1b2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -89,11 +89,11 @@ private[spark] class TaskSchedulerImpl(
 
   // The set of executors we have on each host; this is used to compute 
hostsAlive, which
   // in turn is used to decide when we can attain data locality on a given host
-  private val executorsByHost = new HashMap[String, HashSet[String]]
+  protected val executorsByHost = new HashMap[String, HashSet[String]]
 
   protected val hostsByRack = new HashMap[String, HashSet[String]]
 
-  private val executorIdToHost = new HashMap[String, String]
+  protected val executorIdToHost = new HashMap[String, String]
 
   // Listener object to pass upcalls into
   var dagScheduler: DAGScheduler = null
@@ -249,6 +249,7 @@ private[spark] class TaskSchedulerImpl(
 
     // Take each TaskSet in our scheduling order, and then offer it each node 
in increasing order
     // of locality levels so that it gets a chance to launch local tasks on 
all of them.
+    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
     var launchedTask = false
     for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
       do {
@@ -265,7 +266,7 @@ private[spark] class TaskSchedulerImpl(
               activeExecutorIds += execId
               executorsByHost(host) += execId
               availableCpus(i) -= CPUS_PER_TASK
-              assert (availableCpus(i) >= 0)
+              assert(availableCpus(i) >= 0)
               launchedTask = true
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 8b5e8cb..20a4bd1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -79,6 +79,7 @@ private[spark] class TaskSetManager(
   private val numFailures = new Array[Int](numTasks)
   // key is taskId, value is a Map of executor id to when it failed
   private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
+
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
   var tasksSuccessful = 0
 
@@ -179,26 +180,17 @@ private[spark] class TaskSetManager(
       }
     }
 
-    var hadAliveLocations = false
     for (loc <- tasks(index).preferredLocations) {
       for (execId <- loc.executorId) {
         addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
       }
-      if (sched.hasExecutorsAliveOnHost(loc.host)) {
-        hadAliveLocations = true
-      }
       addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
       for (rack <- sched.getRackForHost(loc.host)) {
         addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
-        if(sched.hasHostAliveOnRack(rack)){
-          hadAliveLocations = true
-        }
       }
     }
 
-    if (!hadAliveLocations) {
-      // Even though the task might've had preferred locations, all of those 
hosts or executors
-      // are dead; put it in the no-prefs list so we can schedule it elsewhere 
right away.
+    if (tasks(index).preferredLocations == Nil) {
       addTo(pendingTasksWithNoPrefs)
     }
 
@@ -239,7 +231,6 @@ private[spark] class TaskSetManager(
    */
   private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): 
Option[Int] = {
     var indexOffset = list.size
-
     while (indexOffset > 0) {
       indexOffset -= 1
       val index = list(indexOffset)
@@ -288,12 +279,12 @@ private[spark] class TaskSetManager(
       !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
 
     if (!speculatableTasks.isEmpty) {
-      // Check for process-local or preference-less tasks; note that tasks can 
be process-local
+      // Check for process-local tasks; note that tasks can be process-local
       // on multiple nodes when we replicate cached blocks, as in Spark 
Streaming
       for (index <- speculatableTasks if canRunOnHost(index)) {
         val prefs = tasks(index).preferredLocations
         val executors = prefs.flatMap(_.executorId)
-        if (prefs.size == 0 || executors.contains(execId)) {
+        if (executors.contains(execId)) {
           speculatableTasks -= index
           return Some((index, TaskLocality.PROCESS_LOCAL))
         }
@@ -310,6 +301,17 @@ private[spark] class TaskSetManager(
         }
       }
 
+      // Check for no-preference tasks
+      if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
+        for (index <- speculatableTasks if canRunOnHost(index)) {
+          val locations = tasks(index).preferredLocations
+          if (locations.size == 0) {
+            speculatableTasks -= index
+            return Some((index, TaskLocality.PROCESS_LOCAL))
+          }
+        }
+      }
+
       // Check for rack-local tasks
       if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
         for (rack <- sched.getRackForHost(host)) {
@@ -341,20 +343,27 @@ private[spark] class TaskSetManager(
    *
    * @return An option containing (task index within the task set, locality, 
is speculative?)
    */
-  private def findTask(execId: String, host: String, locality: 
TaskLocality.Value)
+  private def findTask(execId: String, host: String, maxLocality: 
TaskLocality.Value)
     : Option[(Int, TaskLocality.Value, Boolean)] =
   {
     for (index <- findTaskFromList(execId, 
getPendingTasksForExecutor(execId))) {
       return Some((index, TaskLocality.PROCESS_LOCAL, false))
     }
 
-    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
+    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
       for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
         return Some((index, TaskLocality.NODE_LOCAL, false))
       }
     }
 
-    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
+    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
+      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
+      for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
+        return Some((index, TaskLocality.PROCESS_LOCAL, false))
+      }
+    }
+
+    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
       for {
         rack <- sched.getRackForHost(host)
         index <- findTaskFromList(execId, getPendingTasksForRack(rack))
@@ -363,25 +372,27 @@ private[spark] class TaskSetManager(
       }
     }
 
-    // Look for no-pref tasks after rack-local tasks since they can run 
anywhere.
-    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
-      return Some((index, TaskLocality.PROCESS_LOCAL, false))
-    }
-
-    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
+    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
       for (index <- findTaskFromList(execId, allPendingTasks)) {
         return Some((index, TaskLocality.ANY, false))
       }
     }
 
-    // Finally, if all else has failed, find a speculative task
-    findSpeculativeTask(execId, host, locality).map { case (taskIndex, 
allowedLocality) =>
-      (taskIndex, allowedLocality, true)
-    }
+    // find a speculative task if all others tasks have been scheduled
+    findSpeculativeTask(execId, host, maxLocality).map {
+      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
   }
 
   /**
    * Respond to an offer of a single executor from the scheduler by finding a 
task
+   *
+   * NOTE: this function is either called with a maxLocality which
+   * would be adjusted by delay scheduling algorithm or it will be with a 
special
+   * NO_PREF locality which will be not modified
+   *
+   * @param execId the executor Id of the offered resource
+   * @param host  the host Id of the offered resource
+   * @param maxLocality the maximum locality we want to schedule the tasks at
    */
   def resourceOffer(
       execId: String,
@@ -392,9 +403,14 @@ private[spark] class TaskSetManager(
     if (!isZombie) {
       val curTime = clock.getTime()
 
-      var allowedLocality = getAllowedLocalityLevel(curTime)
-      if (allowedLocality > maxLocality) {
-        allowedLocality = maxLocality   // We're not allowed to search for 
farther-away tasks
+      var allowedLocality = maxLocality
+
+      if (maxLocality != TaskLocality.NO_PREF) {
+        allowedLocality = getAllowedLocalityLevel(curTime)
+        if (allowedLocality > maxLocality) {
+          // We're not allowed to search for farther-away tasks
+          allowedLocality = maxLocality
+        }
       }
 
       findTask(execId, host, allowedLocality) match {
@@ -410,8 +426,11 @@ private[spark] class TaskSetManager(
           taskInfos(taskId) = info
           taskAttempts(index) = info :: taskAttempts(index)
           // Update our locality level for delay scheduling
-          currentLocalityIndex = getLocalityIndex(taskLocality)
-          lastLaunchTime = curTime
+          // NO_PREF will not affect the variables related to delay scheduling
+          if (maxLocality != TaskLocality.NO_PREF) {
+            currentLocalityIndex = getLocalityIndex(taskLocality)
+            lastLaunchTime = curTime
+          }
           // Serialize and return the task
           val startTime = clock.getTime()
           // We rely on the DAGScheduler to catch non-serializable closures 
and RDDs, so in here
@@ -639,8 +658,7 @@ private[spark] class TaskSetManager(
   override def executorLost(execId: String, host: String) {
     logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
 
-    // Re-enqueue pending tasks for this host based on the status of the 
cluster -- for example, a
-    // task that used to have locations on only this host might now go to the 
no-prefs list. Note
+    // Re-enqueue pending tasks for this host based on the status of the 
cluster. Note
     // that it's okay if we add a task to the same queue twice (if it had 
multiple preferred
     // locations), because findTaskFromList will skip already-running tasks.
     for (index <- getPendingTasksForExecutor(execId)) {
@@ -671,6 +689,9 @@ private[spark] class TaskSetManager(
     for ((tid, info) <- taskInfos if info.running && info.executorId == 
execId) {
       handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
     }
+    // recalculate valid locality levels and waits when executor is lost
+    myLocalityLevels = computeValidLocalityLevels()
+    localityWaits = myLocalityLevels.map(getLocalityWait)
   }
 
   /**
@@ -722,17 +743,17 @@ private[spark] class TaskSetManager(
         conf.get("spark.locality.wait.node", defaultWait).toLong
       case TaskLocality.RACK_LOCAL =>
         conf.get("spark.locality.wait.rack", defaultWait).toLong
-      case TaskLocality.ANY =>
-        0L
+      case _ => 0L
     }
   }
 
   /**
    * Compute the locality levels used in this TaskSet. Assumes that all tasks 
have already been
    * added to queues using addPendingTask.
+   *
    */
   private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = 
{
-    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
+    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
     val levels = new ArrayBuffer[TaskLocality.TaskLocality]
     if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 
0 &&
         pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
@@ -742,6 +763,9 @@ private[spark] class TaskSetManager(
         pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
       levels += NODE_LOCAL
     }
+    if (!pendingTasksWithNoPrefs.isEmpty) {
+      levels += NO_PREF
+    }
     if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
         pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
       levels += RACK_LOCAL
@@ -751,20 +775,7 @@ private[spark] class TaskSetManager(
     levels.toArray
   }
 
-  // Re-compute pendingTasksWithNoPrefs since new preferred locations may 
become available
   def executorAdded() {
-    def newLocAvail(index: Int): Boolean = {
-      for (loc <- tasks(index).preferredLocations) {
-        if (sched.hasExecutorsAliveOnHost(loc.host) ||
-           (sched.getRackForHost(loc.host).isDefined &&
-           sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
-          return true
-        }
-      }
-      false
-    }
-    logInfo("Re-computing pending task lists.")
-    pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
     myLocalityLevels = computeValidLocalityLevels()
     localityWaits = myLocalityLevels.map(getLocalityWait)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/63bdb1f4/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index c52368b..ffd2338 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -85,14 +85,31 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: 
(String, String)* /* ex
   val finishedManagers = new ArrayBuffer[TaskSetManager]
   val taskSetsFailed = new ArrayBuffer[String]
 
-  val executors = new mutable.HashMap[String, String] ++ liveExecutors
+  val executors = new mutable.HashMap[String, String]
+  for ((execId, host) <- liveExecutors) {
+    addExecutor(execId, host)
+  }
+
   for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) {
     hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
   }
 
   dagScheduler = new FakeDAGScheduler(sc, this)
 
-  def removeExecutor(execId: String): Unit = executors -= execId
+  def removeExecutor(execId: String) {
+    executors -= execId
+    val host = executorIdToHost.get(execId)
+    assert(host != None)
+    val hostId = host.get
+    val executorsOnHost = executorsByHost(hostId)
+    executorsOnHost -= execId
+    for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) {
+      hosts -= hostId
+      if (hosts.isEmpty) {
+        hostsByRack -= rack
+      }
+    }
+  }
 
   override def taskSetFinished(manager: TaskSetManager): Unit = 
finishedManagers += manager
 
@@ -100,8 +117,15 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: 
(String, String)* /* ex
 
   override def hasExecutorsAliveOnHost(host: String): Boolean = 
executors.values.exists(_ == host)
 
+  override def hasHostAliveOnRack(rack: String): Boolean = {
+    hostsByRack.get(rack) != None
+  }
+
   def addExecutor(execId: String, host: String) {
     executors.put(execId, host)
+    val executorsOnHost = executorsByHost.getOrElseUpdate(host, new 
mutable.HashSet[String])
+    executorsOnHost += execId
+    executorIdToHost += execId -> host
     for (rack <- getRackForHost(host)) {
       hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
     }
@@ -123,7 +147,7 @@ class LargeTask(stageId: Int) extends 
Task[Array[Byte]](stageId, 0) {
 }
 
 class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging 
{
-  import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
+  import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}
 
   private val conf = new SparkConf
 
@@ -134,18 +158,13 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 
-    // Offer a host with process-local as the constraint; this should work 
because the TaskSet
-    // above won't have any locality preferences
-    val taskOption = manager.resourceOffer("exec1", "host1", 
TaskLocality.PROCESS_LOCAL)
+    // Offer a host with NO_PREF as the constraint,
+    // we should get a nopref task immediately since that's what we only have
+    var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
     assert(taskOption.isDefined)
-    val task = taskOption.get
-    assert(task.executorId === "exec1")
-    assert(sched.startedTasks.contains(0))
-
-    // Re-offer the host -- now we should get no more tasks
-    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
 
     // Tell it the task has finished
     manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -161,7 +180,7 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
 
     // First three offers should all find tasks
     for (i <- 0 until 3) {
-      val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
+      var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
       assert(taskOption.isDefined)
       val task = taskOption.get
       assert(task.executorId === "exec1")
@@ -169,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     assert(sched.startedTasks.toSet === Set(0, 1, 2))
 
     // Re-offer the host -- now we should get no more tasks
-    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
+    assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None)
 
     // Finish the first two tasks
     manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -211,37 +230,40 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     )
     val clock = new FakeClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
-
     // First offer host1, exec1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
-
-    // Offer host1, exec1 again: the last task, which has no prefs, should be 
chosen
-    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3)
-
-    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get 
chosen
-    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
+    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
 
     clock.advance(LOCALITY_WAIT)
-
-    // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get 
chosen
-    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
-
-    // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
+    // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) 
should
+    // get chosen before the noPref task
     assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
 
-    // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)
-
-    // Offer host1, exec1 again, at ANY level: nothing should get chosen
-    assert(manager.resourceOffer("exec1", "host1", ANY) === None)
+    // Offer host2, exec3 again, at NODE_LOCAL level: we should choose task 2
+    assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1)
 
+    // Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
+    // after failing to find a node_Local task
+    assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
     clock.advance(LOCALITY_WAIT)
+    assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
+  }
 
-    // Offer host1, exec1 again, at ANY level: task 1 should get chosen
-    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
-
-    // Offer host1, exec1 again, at ANY level: nothing should be chosen as 
we've launched all tasks
-    assert(manager.resourceOffer("exec1", "host1", ANY) === None)
+  test("we do not need to delay scheduling when we only have noPref tasks in 
the queue") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", 
"host2"))
+    val taskSet = FakeTask.createTaskSet(3,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host2", "exec3")),
+      Seq()   // Last task has no locality prefs
+    )
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    // First offer host1, exec1: first task should be chosen
+    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index 
=== 0)
+    assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index 
=== 1)
+    assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None)
+    assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2)
   }
 
   test("delay scheduling with fallback") {
@@ -298,20 +320,24 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     // First offer host1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
 
-    // Offer host1 again: third task should be chosen immediately because 
host3 is not up
-    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
-
-    // After this, nothing should get chosen
+    // After this, nothing should get chosen, because we have separated tasks 
with unavailable preference
+    // from the noPrefPendingTasks
     assert(manager.resourceOffer("exec1", "host1", ANY) === None)
 
     // Now mark host2 as dead
     sched.removeExecutor("exec2")
     manager.executorLost("exec2", "host2")
 
-    // Task 1 should immediately be launched on host1 because its original 
host is gone
+    // nothing should be chosen
+    assert(manager.resourceOffer("exec1", "host1", ANY) === None)
+
+    clock.advance(LOCALITY_WAIT * 2)
+
+    // task 1 and 2 would be scheduled as nonLocal task
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
+    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
 
-    // Now that all tasks have launched, nothing new should be launched 
anywhere else
+    // all finished
     assert(manager.resourceOffer("exec1", "host1", ANY) === None)
     assert(manager.resourceOffer("exec2", "host2", ANY) === None)
   }
@@ -373,7 +399,7 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     val manager = new TaskSetManager(sched, taskSet, 4, clock)
 
     {
-      val offerResult = manager.resourceOffer("exec1", "host1", 
TaskLocality.PROCESS_LOCAL)
+      val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
       assert(offerResult.isDefined, "Expect resource offer to return a task")
 
       assert(offerResult.get.index === 0)
@@ -384,15 +410,15 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
       assert(!sched.taskSetsFailed.contains(taskSet.id))
 
       // Ensure scheduling on exec1 fails after failure 1 due to blacklist
-      assert(manager.resourceOffer("exec1", "host1", 
TaskLocality.PROCESS_LOCAL).isEmpty)
-      assert(manager.resourceOffer("exec1", "host1", 
TaskLocality.NODE_LOCAL).isEmpty)
-      assert(manager.resourceOffer("exec1", "host1", 
TaskLocality.RACK_LOCAL).isEmpty)
-      assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
     }
 
     // Run the task on exec1.1 - should work, and then fail it on exec1.1
     {
-      val offerResult = manager.resourceOffer("exec1.1", "host1", 
TaskLocality.NODE_LOCAL)
+      val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)
       assert(offerResult.isDefined,
         "Expect resource offer to return a task for exec1.1, offerResult = " + 
offerResult)
 
@@ -404,12 +430,12 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
       assert(!sched.taskSetsFailed.contains(taskSet.id))
 
       // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
-      assert(manager.resourceOffer("exec1.1", "host1", 
TaskLocality.NODE_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty)
     }
 
     // Run the task on exec2 - should work, and then fail it on exec2
     {
-      val offerResult = manager.resourceOffer("exec2", "host2", 
TaskLocality.ANY)
+      val offerResult = manager.resourceOffer("exec2", "host2", ANY)
       assert(offerResult.isDefined, "Expect resource offer to return a task")
 
       assert(offerResult.get.index === 0)
@@ -420,20 +446,20 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
       assert(!sched.taskSetsFailed.contains(taskSet.id))
 
       // Ensure scheduling on exec2 fails after failure 3 due to blacklist
-      assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty)
+      assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
     }
 
     // After reschedule delay, scheduling on exec1 should be possible.
     clock.advance(rescheduleDelay)
 
     {
-      val offerResult = manager.resourceOffer("exec1", "host1", 
TaskLocality.PROCESS_LOCAL)
+      val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
       assert(offerResult.isDefined, "Expect resource offer to return a task")
 
       assert(offerResult.get.index === 0)
       assert(offerResult.get.executorId === "exec1")
 
-      assert(manager.resourceOffer("exec1", "host1", 
TaskLocality.PROCESS_LOCAL).isEmpty)
+      assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
 
       // Cause exec1 to fail : failure 4
       manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, 
TaskResultLost)
@@ -443,7 +469,7 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     assert(sched.taskSetsFailed.contains(taskSet.id))
   }
 
-  test("new executors get added") {
+  test("new executors get added and lost") {
     // Assign host2 to rack2
     FakeRackUtil.cleanUp()
     FakeRackUtil.assignHostToRack("host2", "rack2")
@@ -456,26 +482,25 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
       Seq())
     val clock = new FakeClock
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
-    // All tasks added to no-pref list since no preferred location is available
-    assert(manager.pendingTasksWithNoPrefs.size === 4)
     // Only ANY is valid
-    assert(manager.myLocalityLevels.sameElements(Array(ANY)))
+    assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
     // Add a new executor
     sched.addExecutor("execD", "host1")
     manager.executorAdded()
-    // Task 0 and 1 should be removed from no-pref list
-    assert(manager.pendingTasksWithNoPrefs.size === 2)
     // Valid locality should contain NODE_LOCAL and ANY
-    assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
+    assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, 
ANY)))
     // Add another executor
     sched.addExecutor("execC", "host2")
     manager.executorAdded()
-    // No-pref list now only contains task 3
-    assert(manager.pendingTasksWithNoPrefs.size === 1)
     // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and 
ANY
-    assert(manager.myLocalityLevels.sameElements(
-      Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
-    FakeRackUtil.cleanUp()
+    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, 
NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
+    // test if the valid locality is recomputed when the executor is lost
+    sched.removeExecutor("execC")
+    manager.executorLost("execC", "host2")
+    assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, 
ANY)))
+    sched.removeExecutor("execD")
+    manager.executorLost("execD", "host1")
+    assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
   }
 
   test("test RACK_LOCAL tasks") {
@@ -506,7 +531,6 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     // Offer host2
     // Task 1 can be scheduled with RACK_LOCAL
     assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
-    FakeRackUtil.cleanUp()
   }
 
   test("do not emit warning when serialized task is small") {
@@ -536,6 +560,53 @@ class TaskSetManagerSuite extends FunSuite with 
LocalSparkContext with Logging {
     assert(manager.emittedTaskSizeWarning)
   }
 
+  test("speculative and noPref task should be scheduled after node-local") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", 
"host2"), ("execC", "host3"))
+    val taskSet = FakeTask.createTaskSet(4,
+      Seq(TaskLocation("host1", "execA")),
+      Seq(TaskLocation("host2"), TaskLocation("host1")),
+      Seq(),
+      Seq(TaskLocation("host3", "execC")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+    assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index 
=== 0)
+    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
+    assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
+
+    manager.speculatableTasks += 1
+    clock.advance(LOCALITY_WAIT)
+    // schedule the nonPref task
+    assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
+    // schedule the speculative task
+    assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
+    clock.advance(LOCALITY_WAIT * 3)
+    // schedule non-local tasks
+    assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
+  }
+
+  test("node-local tasks should be scheduled right away when there are only 
node-local and no-preference tasks") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", 
"host2"), ("execC", "host3"))
+    val taskSet = FakeTask.createTaskSet(4,
+      Seq(TaskLocation("host1")),
+      Seq(TaskLocation("host2")),
+      Seq(),
+      Seq(TaskLocation("host3")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+    // node-local tasks are scheduled without delay
+    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
+    assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1)
+    assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3)
+    assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None)
+
+    // schedule no-preference after node local ones
+    assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
+  }
+
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
     val valueSer = SparkEnv.get.serializer.newInstance()
     new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new 
TaskMetrics)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to