Updated Branches:
  refs/heads/master 60e23a58b -> 740922f25

Scheduler quits when createStage fails.

The current scheduler thread does not handle exceptions from createStage stage 
while launching new jobs. The thread fails on any exception that gets triggered 
at that level, leaving the cluster hanging with no schduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4d53830e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4d53830e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4d53830e

Branch: refs/heads/master
Commit: 4d53830eb79174cfd9641f6342727bc980d5c3e0
Parents: 743a31a
Author: Sundeep Narravula <[email protected]>
Authored: Sat Nov 30 16:18:12 2013 -0800
Committer: Sundeep Narravula <[email protected]>
Committed: Sat Nov 30 16:18:12 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala   | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4d53830e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4457525..f6a4482 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -384,7 +384,15 @@ class DAGScheduler(
   private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
     event match {
       case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, 
listener, properties) =>
-        val finalStage = newStage(rdd, partitions.size, None, jobId, 
Some(callSite))
+        var finalStage:Stage  = null
+        try {
+          finalStage = newStage(rdd, partitions.size, None, jobId, 
Some(callSite))
+        } catch {
+          case e: Exception =>
+            logWarning("Creating new stage failed due to exception - job: " + 
jobId )
+            listener.jobFailed(e)
+            return false
+        }
         val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, 
listener, properties)
         clearCacheLocs()
         logInfo("Got job " + job.jobId + " (" + callSite + ") with " + 
partitions.length +

Reply via email to