Repository: spark Updated Branches: refs/heads/master 56c771cb2 -> 8d271c90f
SPARK-1929 DAGScheduler suspended by local task OOM DAGScheduler does not handle local task OOM properly, and will wait for the job result forever. Author: Zhen Peng <[email protected]> Closes #883 from zhpengg/bugfix-dag-scheduler-oom and squashes the following commits: 76f7eda [Zhen Peng] remove redundant memory allocations aa63161 [Zhen Peng] SPARK-1929 DAGScheduler suspended by local task OOM Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d271c90 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d271c90 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d271c90 Branch: refs/heads/master Commit: 8d271c90fa496cb24e2b7362ef0497563591b97d Parents: 56c771c Author: Zhen Peng <[email protected]> Authored: Mon May 26 21:30:25 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Mon May 26 21:30:25 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/DAGScheduler.scala | 6 +++++- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8d271c90/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ff411e2..c70aa0e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.NotSerializableException +import java.io.{NotSerializableException, PrintWriter, StringWriter} import java.util.Properties import java.util.concurrent.atomic.AtomicInteger @@ -580,6 +580,10 @@ class DAGScheduler( case e: Exception => jobResult = JobFailed(e) job.listener.jobFailed(e) + case oom: OutOfMemoryError => + val exception = new SparkException("job failed for Out of memory exception", oom) + jobResult = JobFailed(exception) + job.listener.jobFailed(exception) } finally { val s = job.finalStage stageIdToJobIds -= s.id // clean up data structures that were populated for a local job, http://git-wip-us.apache.org/repos/asf/spark/blob/8d271c90/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d172dd1..81e64c1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("local job oom") { + val rdd = new MyRDD(sc, Nil) { + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + throw new java.lang.OutOfMemoryError("test local job oom") + override def getPartitions = Array( new Partition { override def index = 0 } ) + override def getPreferredLocations(split: Partition) = Nil + override def toString = "DAGSchedulerSuite Local RDD" + } + val jobId = scheduler.nextJobId.getAndIncrement() + runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener)) + assert(results.size == 0) + assertDataStructuresEmpty + } + test("run trivial job w/ dependency") { val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
