Updated Branches: refs/heads/branch-0.8 96670e716 -> 0f6278641
Merge pull request #143 from rxin/scheduler-hang Ignore a task update status if the executor doesn't exist anymore. Otherwise if the scheduler receives a task update message when the executor's been removed, the scheduler would hang. It is pretty hard to add unit tests for these right now because it is hard to mock the cluster scheduler. We should do that once @kayousterhout finishes merging the local scheduler and the cluster scheduler. (cherry picked from commit bf4e6131cceef4fe00fb5693117c0732f181dbd9) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0f627864 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0f627864 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0f627864 Branch: refs/heads/branch-0.8 Commit: 0f627864171c83d32b73394b361d1b4aa8dac95b Parents: 96670e7 Author: Reynold Xin <[email protected]> Authored: Tue Nov 5 23:14:09 2013 -0800 Committer: Reynold Xin <[email protected]> Committed: Tue Nov 5 23:14:28 2013 -0800 ---------------------------------------------------------------------- .../cluster/CoarseGrainedSchedulerBackend.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f627864/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 70f3f88..a45bee5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -87,8 +87,14 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - freeCores(executorId) += 1 - makeOffers(executorId) + if (executorActor.contains(executorId)) { + freeCores(executorId) += 1 + makeOffers(executorId) + } else { + // Ignoring the update since we don't know about the executor. + val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s" + logWarning(msg.format(taskId, state, sender, executorId)) + } } case ReviveOffers => @@ -175,7 +181,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } - private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + private val timeout = { + Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + } def stopExecutors() { try {
