Updated Branches: refs/heads/branch-0.8 be9c176a8 -> d21266e97
Merge pull request #219 from sundeepn/schedulerexception Scheduler quits when newStage fails The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler. (cherry picked from commit 740922f25d5f81617fbe02c7bcd1610d6426bbef) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d21266e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d21266e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d21266e9 Branch: refs/heads/branch-0.8 Commit: d21266e9710d7b72218508d050fe6e9fc903944c Parents: be9c176 Author: Reynold Xin <[email protected]> Authored: Sun Dec 1 12:46:58 2013 -0800 Committer: Reynold Xin <[email protected]> Committed: Sun Dec 1 12:47:30 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d21266e9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0a34a06..4a54267 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -368,7 +368,17 @@ class DAGScheduler( private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { event match { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => - val finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) + var finalStage: Stage = null + try { + // New stage creation at times and if its not protected, the scheduler thread is killed. + // e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted + finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + return false + } val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
