Repository: spark
Updated Branches:
  refs/heads/master ce5fd4008 -> 38d9795a4


[SPARK-10248][CORE] track exceptions in dagscheduler event loop in tests

`DAGSchedulerEventLoop` normally only logs errors (so it can continue to 
process more events, from other jobs).  However, this is not desirable in the 
tests -- the tests should be able to easily detect any exception, and also 
shouldn't silently succeed if there is an exception.

This was suggested by mateiz on https://github.com/apache/spark/pull/7699.  It 
may have already turned up an issue in "zero split job".

Author: Imran Rashid <iras...@cloudera.com>

Closes #8466 from squito/SPARK-10248.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38d9795a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38d9795a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38d9795a

Branch: refs/heads/master
Commit: 38d9795a4fa07086d65ff705ce86648345618736
Parents: ce5fd40
Author: Imran Rashid <iras...@cloudera.com>
Authored: Wed Dec 16 19:01:05 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Dec 16 19:01:05 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  5 ++--
 .../spark/scheduler/DAGSchedulerSuite.scala     | 28 ++++++++++++++++++--
 2 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38d9795a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8d0e0c8..b128ed5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -805,7 +805,8 @@ class DAGScheduler(
 
   private[scheduler] def cleanUpAfterSchedulerStop() {
     for (job <- activeJobs) {
-      val error = new SparkException("Job cancelled because SparkContext was 
shut down")
+      val error =
+        new SparkException(s"Job ${job.jobId} cancelled because SparkContext 
was shut down")
       job.listener.jobFailed(error)
       // Tell the listeners that all of the running stages have ended.  Don't 
bother
       // cancelling the stages because if the DAG scheduler is stopped, the 
entire application
@@ -1295,7 +1296,7 @@ class DAGScheduler(
       case TaskResultLost =>
         // Do nothing here; the TaskScheduler handles these failures and 
resubmits the task.
 
-      case other =>
+      case _: ExecutorLostFailure | TaskKilled | UnknownReason =>
         // Unrecognized failure - also do nothing. If the task fails 
repeatedly, the TaskScheduler
         // will abort the job.
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/38d9795a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 653d41f..2869f0f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -45,6 +45,13 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: 
DAGScheduler)
       case NonFatal(e) => onError(e)
     }
   }
+
+  override def onError(e: Throwable): Unit = {
+    logError("Error in DAGSchedulerEventLoop: ", e)
+    dagScheduler.stop()
+    throw e
+  }
+
 }
 
 /**
@@ -300,13 +307,18 @@ class DAGSchedulerSuite
 
   test("zero split job") {
     var numResults = 0
+    var failureReason: Option[Exception] = None
     val fakeListener = new JobListener() {
-      override def taskSucceeded(partition: Int, value: Any) = numResults += 1
-      override def jobFailed(exception: Exception) = throw exception
+      override def taskSucceeded(partition: Int, value: Any): Unit = 
numResults += 1
+      override def jobFailed(exception: Exception): Unit = {
+        failureReason = Some(exception)
+      }
     }
     val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
     assert(numResults === 0)
     cancel(jobId)
+    assert(failureReason.isDefined)
+    assert(failureReason.get.getMessage() === "Job 0 cancelled ")
   }
 
   test("run trivial job") {
@@ -1675,6 +1687,18 @@ class DAGSchedulerSuite
     assert(stackTraceString.contains("org.scalatest.FunSuite"))
   }
 
+  test("catch errors in event loop") {
+    // this is a test of our testing framework -- make sure errors in event 
loop don't get ignored
+
+    // just run some bad event that will throw an exception -- we'll give a 
null TaskEndReason
+    val rdd1 = new MyRDD(sc, 1, Nil)
+    submit(rdd1, Array(0))
+    intercept[Exception] {
+      complete(taskSets(0), Seq(
+        (null, makeMapStatus("hostA", 1))))
+    }
+  }
+
   test("simple map stage submission") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(1))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to