Repository: spark Updated Branches: refs/heads/master c262cd5dd -> 61ca7742d
[SPARK-4020] Do not rely on timeouts to remove failed block managers If an executor fails without being scheduled to run any tasks, then `DAGScheduler` won't notify `BlockManagerMasterActor` that the associated block manager should be removed. Instead, the associated block manager will be expired only after a few rounds of heartbeat timeouts. In terms of removal treatment, there should really be no distinction between executors that have been scheduled tasks and those that have not. The fix, then, is to add all known executors to `TaskSchedulerImpl`'s `activeExecutorIds` whether or not it has been scheduled a task. In fact, the existing comment above `activeExecutorIds` is ``` // Which executor IDs we have executors on val activeExecutorIds = new HashSet[String] ``` not "Which executors have been scheduled tasks thus far." Author: Andrew Or <[email protected]> Closes #2865 from andrewor14/active-executors and squashes the following commits: ff3172b [Andrew Or] Add all known executors to `activeExecutorIds` Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61ca7742 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61ca7742 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61ca7742 Branch: refs/heads/master Commit: 61ca7742d21dd66f5a7b3bb826e3aaca6f049b68 Parents: c262cd5 Author: Andrew Or <[email protected]> Authored: Tue Oct 21 11:22:25 2014 -0700 Committer: Kay Ousterhout <[email protected]> Committed: Tue Oct 21 11:22:25 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/61ca7742/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6d697e3..2b39c7f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -221,6 +221,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host + activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -261,7 +262,6 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId - activeExecutorIds += execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
