Repository: spark
Updated Branches:
  refs/heads/master 19530da69 -> 81012546e


[SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's 
thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of 
https://github.com/apache/spark/pull/9741

Author: tedyu <yuzhih...@gmail.com>

Closes #9852 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81012546
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81012546
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81012546

Branch: refs/heads/master
Commit: 81012546ee5a80d2576740af0dad067b0f5962c5
Parents: 19530da
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Nov 24 12:22:33 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Nov 24 12:22:33 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 +++
 .../spark/scheduler/SparkListenerSuite.scala    | 31 ++++++++++++++++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81012546/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b153a7b..e19ba11 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 
   // Shut down the SparkContext.
   def stop() {
+    if (AsynchronousListenerBus.withinListenerThread.value) {
+      throw new SparkException("Cannot stop SparkContext within listener 
thread of" +
+        " AsynchronousListenerBus")
+    }
     // Use the stopping variable to ensure no contention for the stop scenario.
     // Still track the stopped variable for use elsewhere in the code.
     if (!stopped.compareAndSet(false, true)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/81012546/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 84e5458..f20d5be 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
+import org.apache.spark.SparkException
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.ResetSystemProperties
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 
   val jobCompletionTime = 1421191296660L
 
+  test("don't call sc.stop in listener") {
+    sc = new SparkContext("local", "SparkListenerSuite")
+    val listener = new SparkContextStoppingListener(sc)
+    val bus = new LiveListenerBus
+    bus.addListener(listener)
+
+    // Starting listener bus should flush all buffered events
+    bus.start(sc)
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+    bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+    bus.stop()
+    assert(listener.sparkExSeen)
+  }
+
   test("basic creation and shutdown of LiveListenerBus") {
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
 
+/**
+ * A simple listener that tries to stop SparkContext.
+ */
+private class SparkContextStoppingListener(val sc: SparkContext) extends 
SparkListener {
+  @volatile var sparkExSeen = false
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = {
+    try {
+      sc.stop()
+    } catch {
+      case se: SparkException =>
+        sparkExSeen = true
+    }
+  }
+}
+
 private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends 
SparkListener {
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to