Repository: spark
Updated Branches:
  refs/heads/master 282158914 -> c15b552dd


[SPARK-16106][CORE] TaskSchedulerImpl should properly track executors added to 
existing hosts

## What changes were proposed in this pull request?

TaskSchedulerImpl used to only set `newExecAvailable` when a new *host* was 
added, not when a new executor was added to an existing host.  It also didn't 
update some internal state tracking live executors until a task was scheduled 
on the executor.  This patch changes it to properly update as soon as it knows 
about a new executor.

## How was this patch tested?

added a unit test, ran everything via jenkins.

Author: Imran Rashid <iras...@cloudera.com>

Closes #13826 from squito/SPARK-16106_executorByHosts.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c15b552d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c15b552d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c15b552d

Branch: refs/heads/master
Commit: c15b552dd547a129c7f0d082dab4eebbd64bee02
Parents: 2821589
Author: Imran Rashid <iras...@cloudera.com>
Authored: Mon Jun 27 16:38:03 2016 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Mon Jun 27 16:38:03 2016 -0500

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |   8 +-
 .../scheduler/TaskSchedulerImplSuite.scala      | 168 ++++++++++++-------
 2 files changed, 111 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c15b552d/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4282606..821e3ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -266,7 +266,6 @@ private[spark] class TaskSchedulerImpl(
             taskIdToTaskSetManager(tid) = taskSet
             taskIdToExecutorId(tid) = execId
             executorIdToTaskCount(execId) += 1
-            executorsByHost(host) += execId
             availableCpus(i) -= CPUS_PER_TASK
             assert(availableCpus(i) >= 0)
             launchedTask = true
@@ -293,11 +292,14 @@ private[spark] class TaskSchedulerImpl(
     // Also track if new executor is added
     var newExecAvail = false
     for (o <- offers) {
-      executorIdToHost(o.executorId) = o.host
-      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
       if (!executorsByHost.contains(o.host)) {
         executorsByHost(o.host) = new HashSet[String]()
+      }
+      if (!executorIdToTaskCount.contains(o.executorId)) {
+        executorsByHost(o.host) += o.executorId
         executorAdded(o.executorId, o.host)
+        executorIdToHost(o.executorId) = o.host
+        executorIdToTaskCount(o.executorId) = 0
         newExecAvail = true
       }
       for (rack <- getRackForHost(o.host)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c15b552d/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a09a602..34b8d55 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import org.scalatest.BeforeAndAfterEach
+
 import org.apache.spark._
 import org.apache.spark.internal.Logging
 
@@ -27,18 +29,63 @@ class FakeSchedulerBackend extends SchedulerBackend {
   def defaultParallelism(): Int = 1
 }
 
-class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with 
Logging {
+class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with 
BeforeAndAfterEach
+    with Logging {
 
-  test("Scheduler does not always schedule tasks on the same workers") {
+
+  var failedTaskSetException: Option[Throwable] = None
+  var failedTaskSetReason: String = null
+  var failedTaskSet = false
+
+  var taskScheduler: TaskSchedulerImpl = null
+  var dagScheduler: DAGScheduler = null
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    failedTaskSet = false
+    failedTaskSetException = None
+    failedTaskSetReason = null
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    if (taskScheduler != null) {
+      taskScheduler.stop()
+      taskScheduler = null
+    }
+    if (dagScheduler != null) {
+      dagScheduler.stop()
+      dagScheduler = null
+    }
+  }
+
+  def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
     sc = new SparkContext("local", "TaskSchedulerImplSuite")
-    val taskScheduler = new TaskSchedulerImpl(sc)
+    confs.foreach { case (k, v) =>
+      sc.conf.set(k, v)
+    }
+    taskScheduler = new TaskSchedulerImpl(sc)
     taskScheduler.initialize(new FakeSchedulerBackend)
     // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
-    new DAGScheduler(sc, taskScheduler) {
-      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorAdded(execId: String, host: String) {}
+    dagScheduler = new DAGScheduler(sc, taskScheduler) {
+      override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+      override def executorAdded(execId: String, host: String): Unit = {}
+      override def taskSetFailed(
+          taskSet: TaskSet,
+          reason: String,
+          exception: Option[Throwable]): Unit = {
+        // Normally the DAGScheduler puts this in the event loop, which will 
eventually fail
+        // dependent jobs
+        failedTaskSet = true
+        failedTaskSetReason = reason
+        failedTaskSetException = exception
+      }
     }
+    taskScheduler
+  }
 
+  test("Scheduler does not always schedule tasks on the same workers") {
+    val taskScheduler = setupScheduler()
     val numFreeCores = 1
     val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
       new WorkerOffer("executor1", "host1", numFreeCores))
@@ -58,20 +105,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
     val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
     assert(count > 0)
     assert(count < numTrials)
+    assert(!failedTaskSet)
   }
 
   test("Scheduler correctly accounts for multiple CPUs per task") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
     val taskCpus = 2
-
-    sc.conf.set("spark.task.cpus", taskCpus.toString)
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    taskScheduler.initialize(new FakeSchedulerBackend)
-    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
-    new DAGScheduler(sc, taskScheduler) {
-      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorAdded(execId: String, host: String) {}
-    }
+    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
     // Give zero core offers. Should not generate any tasks
     val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
       new WorkerOffer("executor1", "host1", 0))
@@ -96,22 +135,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
     taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
     assert(1 === taskDescriptions.length)
     assert("executor0" === taskDescriptions(0).executorId)
+    assert(!failedTaskSet)
   }
 
   test("Scheduler does not crash when tasks are not serializable") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
     val taskCpus = 2
-
-    sc.conf.set("spark.task.cpus", taskCpus.toString)
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    taskScheduler.initialize(new FakeSchedulerBackend)
-    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
-    val dagScheduler = new DAGScheduler(sc, taskScheduler) {
-      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorAdded(execId: String, host: String) {}
-    }
+    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
     val numFreeCores = 1
-    taskScheduler.setDAGScheduler(dagScheduler)
     val taskSet = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 0, 0, 0, null)
     val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 
taskCpus),
@@ -119,26 +149,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
     taskScheduler.submitTasks(taskSet)
     var taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
     assert(0 === taskDescriptions.length)
+    assert(failedTaskSet)
+    assert(failedTaskSetReason.contains("Failed to serialize task"))
 
     // Now check that we can still submit tasks
-    // Even if one of the tasks has not-serializable tasks, the other task set 
should
+    // Even if one of the task sets has not-serializable tasks, the other task 
set should
     // still be processed without error
-    taskScheduler.submitTasks(taskSet)
     taskScheduler.submitTasks(FakeTask.createTaskSet(1))
+    taskScheduler.submitTasks(taskSet)
     taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
     assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
   }
 
   test("refuse to schedule concurrent attempts for the same stage 
(SPARK-8103)") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    taskScheduler.initialize(new FakeSchedulerBackend)
-    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
-    val dagScheduler = new DAGScheduler(sc, taskScheduler) {
-      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorAdded(execId: String, host: String) {}
-    }
-    taskScheduler.setDAGScheduler(dagScheduler)
+    val taskScheduler = setupScheduler()
     val attempt1 = FakeTask.createTaskSet(1, 0)
     val attempt2 = FakeTask.createTaskSet(1, 1)
     taskScheduler.submitTasks(attempt1)
@@ -153,17 +177,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
     taskScheduler.taskSetManagerForAttempt(attempt2.stageId, 
attempt2.stageAttemptId)
       .get.isZombie = true
     taskScheduler.submitTasks(attempt3)
+    assert(!failedTaskSet)
   }
 
   test("don't schedule more tasks after a taskset is zombie") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    taskScheduler.initialize(new FakeSchedulerBackend)
-    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
-    new DAGScheduler(sc, taskScheduler) {
-      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorAdded(execId: String, host: String) {}
-    }
+    val taskScheduler = setupScheduler()
 
     val numFreeCores = 1
     val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
@@ -191,17 +209,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
     assert(1 === taskDescriptions3.length)
     val mgr = 
taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get
     assert(mgr.taskSet.stageAttemptId === 1)
+    assert(!failedTaskSet)
   }
 
   test("if a zombie attempt finishes, continue scheduling tasks for non-zombie 
attempts") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    taskScheduler.initialize(new FakeSchedulerBackend)
-    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
-    new DAGScheduler(sc, taskScheduler) {
-      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorAdded(execId: String, host: String) {}
-    }
+    val taskScheduler = setupScheduler()
 
     val numFreeCores = 10
     val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
@@ -236,17 +248,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
       val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get
       assert(mgr.taskSet.stageAttemptId === 1)
     }
+    assert(!failedTaskSet)
   }
 
   test("tasks are not re-scheduled while executor loss reason is pending") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
-    val taskScheduler = new TaskSchedulerImpl(sc)
-    taskScheduler.initialize(new FakeSchedulerBackend)
-    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
-    new DAGScheduler(sc, taskScheduler) {
-      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
-      override def executorAdded(execId: String, host: String) {}
-    }
+    val taskScheduler = setupScheduler()
 
     val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
     val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
@@ -272,6 +278,44 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with L
     val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
     assert(1 === taskDescriptions3.length)
     assert("executor1" === taskDescriptions3(0).executorId)
+    assert(!failedTaskSet)
   }
 
+  test("SPARK-16106 locality levels updated if executor added to existing 
host") {
+    val taskScheduler = setupScheduler()
+
+    taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0,
+      (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _*
+    ))
+
+    val taskDescs = taskScheduler.resourceOffers(Seq(
+      new WorkerOffer("executor0", "host0", 1),
+      new WorkerOffer("executor1", "host1", 1)
+    )).flatten
+    // only schedule one task because of locality
+    assert(taskDescs.size === 1)
+
+    val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get
+    assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, 
TaskLocality.ANY))
+    // we should know about both executors, even though we only scheduled 
tasks on one of them
+    assert(taskScheduler.getExecutorsAliveOnHost("host0") === 
Some(Set("executor0")))
+    assert(taskScheduler.getExecutorsAliveOnHost("host1") === 
Some(Set("executor1")))
+
+    // when executor2 is added, we should realize that we can run 
process-local tasks.
+    // And we should know its alive on the host.
+    val secondTaskDescs = taskScheduler.resourceOffers(
+      Seq(new WorkerOffer("executor2", "host0", 1))).flatten
+    assert(secondTaskDescs.size === 1)
+    assert(mgr.myLocalityLevels.toSet ===
+      Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, 
TaskLocality.ANY))
+    assert(taskScheduler.getExecutorsAliveOnHost("host0") === 
Some(Set("executor0", "executor2")))
+    assert(taskScheduler.getExecutorsAliveOnHost("host1") === 
Some(Set("executor1")))
+
+    // And even if we don't have anything left to schedule, another resource 
offer on yet another
+    // executor should also update the set of live executors
+    val thirdTaskDescs = taskScheduler.resourceOffers(
+      Seq(new WorkerOffer("executor3", "host1", 1))).flatten
+    assert(thirdTaskDescs.size === 0)
+    assert(taskScheduler.getExecutorsAliveOnHost("host1") === 
Some(Set("executor1", "executor3")))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to