Repository: spark
Updated Branches:
  refs/heads/master 56c771cb2 -> 8d271c90f


SPARK-1929 DAGScheduler suspended by local task OOM

DAGScheduler does not handle local task OOM properly, and will wait for the job 
result forever.

Author: Zhen Peng <[email protected]>

Closes #883 from zhpengg/bugfix-dag-scheduler-oom and squashes the following 
commits:

76f7eda [Zhen Peng] remove redundant memory allocations
aa63161 [Zhen Peng] SPARK-1929 DAGScheduler suspended by local task OOM


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d271c90
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d271c90
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d271c90

Branch: refs/heads/master
Commit: 8d271c90fa496cb24e2b7362ef0497563591b97d
Parents: 56c771c
Author: Zhen Peng <[email protected]>
Authored: Mon May 26 21:30:25 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon May 26 21:30:25 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/DAGScheduler.scala     |  6 +++++-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala    | 14 ++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d271c90/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e2..c70aa0e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.NotSerializableException
+import java.io.{NotSerializableException, PrintWriter, StringWriter}
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -580,6 +580,10 @@ class DAGScheduler(
       case e: Exception =>
         jobResult = JobFailed(e)
         job.listener.jobFailed(e)
+      case oom: OutOfMemoryError =>
+        val exception = new SparkException("job failed for Out of memory 
exception", oom)
+        jobResult = JobFailed(exception)
+        job.listener.jobFailed(exception)
     } finally {
       val s = job.finalStage
       stageIdToJobIds -= s.id    // clean up data structures that were 
populated for a local job,

http://git-wip-us.apache.org/repos/asf/spark/blob/8d271c90/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d172dd1..81e64c1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("local job oom") {
+    val rdd = new MyRDD(sc, Nil) {
+      override def compute(split: Partition, context: TaskContext): 
Iterator[(Int, Int)] =
+        throw new java.lang.OutOfMemoryError("test local job oom")
+      override def getPartitions = Array( new Partition { override def index = 
0 } )
+      override def getPreferredLocations(split: Partition) = Nil
+      override def toString = "DAGSchedulerSuite Local RDD"
+    }
+    val jobId = scheduler.nextJobId.getAndIncrement()
+    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, 
jobListener))
+    assert(results.size == 0)
+    assertDataStructuresEmpty
+  }
+
   test("run trivial job w/ dependency") {
     val baseRdd = makeRdd(1, Nil)
     val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))

Reply via email to