Repository: spark
Updated Branches:
  refs/heads/master c262cd5dd -> 61ca7742d


[SPARK-4020] Do not rely on timeouts to remove failed block managers

If an executor fails without being scheduled to run any tasks, then 
`DAGScheduler` won't notify `BlockManagerMasterActor` that the associated block 
manager should be removed. Instead, the associated block manager will be 
expired only after a few rounds of heartbeat timeouts. In terms of removal 
treatment, there should really be no distinction between executors that have 
been scheduled tasks and those that have not.

The fix, then, is to add all known executors to `TaskSchedulerImpl`'s 
`activeExecutorIds` whether or not it has been scheduled a task. In fact, the 
existing comment above `activeExecutorIds` is
```
// Which executor IDs we have executors on
val activeExecutorIds = new HashSet[String]
```
not  "Which executors have been scheduled tasks thus far."

Author: Andrew Or <[email protected]>

Closes #2865 from andrewor14/active-executors and squashes the following 
commits:

ff3172b [Andrew Or] Add all known executors to `activeExecutorIds`


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61ca7742
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61ca7742
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61ca7742

Branch: refs/heads/master
Commit: 61ca7742d21dd66f5a7b3bb826e3aaca6f049b68
Parents: c262cd5
Author: Andrew Or <[email protected]>
Authored: Tue Oct 21 11:22:25 2014 -0700
Committer: Kay Ousterhout <[email protected]>
Committed: Tue Oct 21 11:22:25 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61ca7742/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6d697e3..2b39c7f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -221,6 +221,7 @@ private[spark] class TaskSchedulerImpl(
     var newExecAvail = false
     for (o <- offers) {
       executorIdToHost(o.executorId) = o.host
+      activeExecutorIds += o.executorId
       if (!executorsByHost.contains(o.host)) {
         executorsByHost(o.host) = new HashSet[String]()
         executorAdded(o.executorId, o.host)
@@ -261,7 +262,6 @@ private[spark] class TaskSchedulerImpl(
               val tid = task.taskId
               taskIdToTaskSetId(tid) = taskSet.taskSet.id
               taskIdToExecutorId(tid) = execId
-              activeExecutorIds += execId
               executorsByHost(host) += execId
               availableCpus(i) -= CPUS_PER_TASK
               assert(availableCpus(i) >= 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to