Repository: spark
Updated Branches:
  refs/heads/branch-1.5 7f10bd620 -> fcb24387a


[SPARK-10564] ThreadingSuite: assertion failures in threads don't fail the test

This commit ensures if an assertion fails within a thread, it will ultimately 
fail the test. Otherwise we end up potentially masking real bugs by not 
propagating assertion failures properly.

Author: Andrew Or <and...@databricks.com>

Closes #8723 from andrewor14/fix-threading-suite.

(cherry picked from commit d74c6a143cbd060c25bf14a8d306841b3ec55d03)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fcb24387
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fcb24387
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fcb24387

Branch: refs/heads/branch-1.5
Commit: fcb24387add76529707f27b86b57f79de1110f24
Parents: 7f10bd6
Author: Andrew Or <and...@databricks.com>
Authored: Fri Sep 11 15:02:59 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Sep 11 15:03:07 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ThreadingSuite.scala | 68 +++++++++++++-------
 1 file changed, 45 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fcb24387/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala 
b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 48509f0..cda2b24 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -119,23 +119,30 @@ class ThreadingSuite extends SparkFunSuite with 
LocalSparkContext with Logging {
     val nums = sc.parallelize(1 to 2, 2)
     val sem = new Semaphore(0)
     ThreadingSuiteState.clear()
+    var throwable: Option[Throwable] = None
     for (i <- 0 until 2) {
       new Thread {
         override def run() {
-          val ans = nums.map(number => {
-            val running = ThreadingSuiteState.runningThreads
-            running.getAndIncrement()
-            val time = System.currentTimeMillis()
-            while (running.get() != 4 && System.currentTimeMillis() < time + 
1000) {
-              Thread.sleep(100)
-            }
-            if (running.get() != 4) {
-              ThreadingSuiteState.failed.set(true)
-            }
-            number
-          }).collect()
-          assert(ans.toList === List(1, 2))
-          sem.release()
+          try {
+            val ans = nums.map(number => {
+              val running = ThreadingSuiteState.runningThreads
+              running.getAndIncrement()
+              val time = System.currentTimeMillis()
+              while (running.get() != 4 && System.currentTimeMillis() < time + 
1000) {
+                Thread.sleep(100)
+              }
+              if (running.get() != 4) {
+                ThreadingSuiteState.failed.set(true)
+              }
+              number
+            }).collect()
+            assert(ans.toList === List(1, 2))
+          } catch {
+            case t: Throwable =>
+              throwable = Some(t)
+          } finally {
+            sem.release()
+          }
         }
       }.start()
     }
@@ -145,18 +152,25 @@ class ThreadingSuite extends SparkFunSuite with 
LocalSparkContext with Logging {
                 ThreadingSuiteState.runningThreads.get() + "); failing test")
       fail("One or more threads didn't see runningThreads = 4")
     }
+    throwable.foreach { t => throw t }
   }
 
   test("set local properties in different thread") {
     sc = new SparkContext("local", "test")
     val sem = new Semaphore(0)
-
+    var throwable: Option[Throwable] = None
     val threads = (1 to 5).map { i =>
       new Thread() {
         override def run() {
-          sc.setLocalProperty("test", i.toString)
-          assert(sc.getLocalProperty("test") === i.toString)
-          sem.release()
+          try {
+            sc.setLocalProperty("test", i.toString)
+            assert(sc.getLocalProperty("test") === i.toString)
+          } catch {
+            case t: Throwable =>
+              throwable = Some(t)
+          } finally {
+            sem.release()
+          }
         }
       }
     }
@@ -165,20 +179,27 @@ class ThreadingSuite extends SparkFunSuite with 
LocalSparkContext with Logging {
 
     sem.acquire(5)
     assert(sc.getLocalProperty("test") === null)
+    throwable.foreach { t => throw t }
   }
 
   test("set and get local properties in parent-children thread") {
     sc = new SparkContext("local", "test")
     sc.setLocalProperty("test", "parent")
     val sem = new Semaphore(0)
-
+    var throwable: Option[Throwable] = None
     val threads = (1 to 5).map { i =>
       new Thread() {
         override def run() {
-          assert(sc.getLocalProperty("test") === "parent")
-          sc.setLocalProperty("test", i.toString)
-          assert(sc.getLocalProperty("test") === i.toString)
-          sem.release()
+          try {
+            assert(sc.getLocalProperty("test") === "parent")
+            sc.setLocalProperty("test", i.toString)
+            assert(sc.getLocalProperty("test") === i.toString)
+          } catch {
+            case t: Throwable =>
+              throwable = Some(t)
+          } finally {
+            sem.release()
+          }
         }
       }
     }
@@ -188,6 +209,7 @@ class ThreadingSuite extends SparkFunSuite with 
LocalSparkContext with Logging {
     sem.acquire(5)
     assert(sc.getLocalProperty("test") === "parent")
     assert(sc.getLocalProperty("Foo") === null)
+    throwable.foreach { t => throw t }
   }
 
   test("mutations to local properties should not affect submitted jobs 
(SPARK-6629)") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to