Repository: spark Updated Branches: refs/heads/master ce5fd4008 -> 38d9795a4
[SPARK-10248][CORE] track exceptions in dagscheduler event loop in tests `DAGSchedulerEventLoop` normally only logs errors (so it can continue to process more events, from other jobs). However, this is not desirable in the tests -- the tests should be able to easily detect any exception, and also shouldn't silently succeed if there is an exception. This was suggested by mateiz on https://github.com/apache/spark/pull/7699. It may have already turned up an issue in "zero split job". Author: Imran Rashid <iras...@cloudera.com> Closes #8466 from squito/SPARK-10248. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38d9795a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38d9795a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38d9795a Branch: refs/heads/master Commit: 38d9795a4fa07086d65ff705ce86648345618736 Parents: ce5fd40 Author: Imran Rashid <iras...@cloudera.com> Authored: Wed Dec 16 19:01:05 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Wed Dec 16 19:01:05 2015 -0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 5 ++-- .../spark/scheduler/DAGSchedulerSuite.scala | 28 ++++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/38d9795a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8d0e0c8..b128ed5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -805,7 +805,8 @@ class DAGScheduler( private[scheduler] def cleanUpAfterSchedulerStop() { for (job <- activeJobs) { - val error = new SparkException("Job cancelled because SparkContext was shut down") + val error = + new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down") job.listener.jobFailed(error) // Tell the listeners that all of the running stages have ended. Don't bother // cancelling the stages because if the DAG scheduler is stopped, the entire application @@ -1295,7 +1296,7 @@ class DAGScheduler( case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. - case other => + case _: ExecutorLostFailure | TaskKilled | UnknownReason => // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } http://git-wip-us.apache.org/repos/asf/spark/blob/38d9795a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 653d41f..2869f0f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -45,6 +45,13 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) case NonFatal(e) => onError(e) } } + + override def onError(e: Throwable): Unit = { + logError("Error in DAGSchedulerEventLoop: ", e) + dagScheduler.stop() + throw e + } + } /** @@ -300,13 +307,18 @@ class DAGSchedulerSuite test("zero split job") { var numResults = 0 + var failureReason: Option[Exception] = None val fakeListener = new JobListener() { - override def taskSucceeded(partition: Int, value: Any) = numResults += 1 - override def jobFailed(exception: Exception) = throw exception + override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1 + override def jobFailed(exception: Exception): Unit = { + failureReason = Some(exception) + } } val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener) assert(numResults === 0) cancel(jobId) + assert(failureReason.isDefined) + assert(failureReason.get.getMessage() === "Job 0 cancelled ") } test("run trivial job") { @@ -1675,6 +1687,18 @@ class DAGSchedulerSuite assert(stackTraceString.contains("org.scalatest.FunSuite")) } + test("catch errors in event loop") { + // this is a test of our testing framework -- make sure errors in event loop don't get ignored + + // just run some bad event that will throw an exception -- we'll give a null TaskEndReason + val rdd1 = new MyRDD(sc, 1, Nil) + submit(rdd1, Array(0)) + intercept[Exception] { + complete(taskSets(0), Seq( + (null, makeMapStatus("hostA", 1)))) + } + } + test("simple map stage submission") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org