Repository: spark
Updated Branches:
  refs/heads/master 645cf3fcc -> 2c3f83c34


[SPARK-4012] stop SparkContext when the exception is thrown from an infinite 
loop

https://issues.apache.org/jira/browse/SPARK-4012

This patch is a resubmission for https://github.com/apache/spark/pull/2864

What I am proposing in this patch is that ***when the exception is thrown from 
an infinite loop, we should stop the SparkContext, instead of let JVM throws 
exception forever***

So, in the infinite loops where we originally wrapped with a ` 
logUncaughtExceptions`, I changed to `tryOrStopSparkContext`, so that the Spark 
component is stopped

Early stopped JVM process is helpful for HA scheme design, for example,

The user has a script checking the existence of the pid of the Spark Streaming 
driver for monitoring the availability; with the code before this patch, the 
JVM process is still available but not functional when the exceptions are thrown

andrewor14, srowen , mind taking further consideration about the change?

Author: CodingCat <[email protected]>

Closes #5004 from CodingCat/SPARK-4012-1 and squashes the following commits:

589276a [CodingCat] throw fatal error again
3c72cd8 [CodingCat] address the comments
6087864 [CodingCat] revise comments
6ad3eb0 [CodingCat] stop SparkContext instead of quit the JVM process
6322959 [CodingCat] exit JVM process when the exception is thrown from an 
infinite loop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c3f83c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c3f83c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c3f83c3

Branch: refs/heads/master
Commit: 2c3f83c34bb8d2c1bf13b33633d8c5a8089545d1
Parents: 645cf3f
Author: CodingCat <[email protected]>
Authored: Wed Mar 18 23:48:45 2015 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Wed Mar 18 23:48:45 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala |  2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../deploy/history/FsHistoryProvider.scala      |  2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  2 +-
 .../spark/util/AsynchronousListenerBus.scala    | 10 +++++--
 .../scala/org/apache/spark/util/Utils.scala     | 28 ++++++++++++++++++++
 .../scheduler/EventLoggingListenerSuite.scala   |  9 ++++---
 .../spark/scheduler/SparkListenerSuite.scala    | 10 +++----
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 9 files changed, 51 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 0c59a61..9b05c96 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
   }
 
   /** Keep cleaning RDD, shuffle, and broadcast state. */
-  private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
+  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
     while (!stopped) {
       try {
         val reference = 
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4457f40..228ff71 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
         }
     }
 
-    listenerBus.start()
+    listenerBus.start(this)
   }
 
   /** Post the application start event */

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 16d88c1..7fde020 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
    */
   private def getRunner(operateFun: () => Unit): Runnable = {
     new Runnable() {
-      override def run() = Utils.logUncaughtExceptions {
+      override def run() = Utils.tryOrExit {
         operateFun()
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 7a9cf1c..f33fd44 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -145,7 +145,7 @@ private[spark] class TaskSchedulerImpl(
       import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
             SPECULATION_INTERVAL milliseconds) {
-        Utils.tryOrExit { checkSpeculatableTasks() }
+        Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala 
b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 18c627e..ce7887b 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -21,6 +21,7 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
 
 import com.google.common.annotations.VisibleForTesting
+import org.apache.spark.SparkContext
 
 /**
  * Asynchronously passes events to registered listeners.
@@ -38,6 +39,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: 
AnyRef, E](name: Stri
 
   self =>
 
+  private var sparkContext: SparkContext = null
+  
   /* Cap the capacity of the event queue so we get an explicit error (rather 
than
    * an OOM exception) if it's perpetually being added to more quickly than 
it's being drained. */
   private val EVENT_QUEUE_CAPACITY = 10000
@@ -57,7 +60,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: 
AnyRef, E](name: Stri
 
   private val listenerThread = new Thread(name) {
     setDaemon(true)
-    override def run(): Unit = Utils.logUncaughtExceptions {
+    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
       while (true) {
         eventLock.acquire()
         self.synchronized {
@@ -89,9 +92,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: 
AnyRef, E](name: Stri
    * This first sends out all buffered events posted before this listener bus 
has started, then
    * listens for any additional events asynchronously while the listener bus 
is still running.
    * This should only be called once.
+   *
+   * @param sc Used to stop the SparkContext in case the listener thread dies.
    */
-  def start() {
+  def start(sc: SparkContext) {
     if (started.compareAndSet(false, true)) {
+      sparkContext = sc
       listenerThread.start()
     } else {
       throw new IllegalStateException(s"$name already started!")

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index af8a245..91aa708 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1146,6 +1146,8 @@ private[spark] object Utils extends Logging {
   /**
    * Execute a block of code that evaluates to Unit, forwarding any uncaught 
exceptions to the
    * default UncaughtExceptionHandler
+   * 
+   * NOTE: This method is to be called by the spark-started JVM process.
    */
   def tryOrExit(block: => Unit) {
     try {
@@ -1157,6 +1159,32 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Execute a block of code that evaluates to Unit, stop SparkContext is 
there is any uncaught 
+   * exception
+   *  
+   * NOTE: This method is to be called by the driver-side components to avoid 
stopping the 
+   * user-started JVM process completely; in contrast, tryOrExit is to be 
called in the 
+   * spark-started JVM process .
+   */
+  def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
+    try {
+      block
+    } catch {
+      case e: ControlThrowable => throw e
+      case t: Throwable =>
+        val currentThreadName = Thread.currentThread().getName
+        if (sc != null) {
+          logError(s"uncaught error in thread $currentThreadName, stopping 
SparkContext", t)
+          sc.stop()
+        }
+        if (!NonFatal(t)) {
+          logError(s"throw uncaught fatal error in thread $currentThreadName", 
t)
+          throw t
+        }
+    }
+  }
+
+  /**
    * Execute a block of code that evaluates to Unit, re-throwing any non-fatal 
uncaught
    * exceptions as IOException.  This is used when implementing Externalizable 
and Serializable's
    * read and write methods, since Java's serializer will not report 
non-IOExceptions properly;

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 992dde6..448258a 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -25,9 +25,9 @@ import scala.io.Source
 
 import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.{FunSuiteLike, BeforeAndAfter, FunSuite}
 
-import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
+import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io._
 import org.apache.spark.util.{JsonProtocol, Utils}
@@ -39,7 +39,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  * logging events, whether the parsing of the file names is correct, and 
whether the logged events
  * can be read and deserialized into actual SparkListenerEvents.
  */
-class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with 
Logging {
+class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with 
BeforeAndAfter
+  with Logging {
   import EventLoggingListenerSuite._
 
   private val fileSystem = Utils.getHadoopFileSystem("/",
@@ -144,7 +145,7 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter with Loggin
 
     // A comprehensive test on JSON de/serialization of all events is in 
JsonProtocolSuite
     eventLogger.start()
-    listenerBus.start()
+    listenerBus.start(sc)
     listenerBus.addListener(eventLogger)
     listenerBus.postToAll(applicationStart)
     listenerBus.postToAll(applicationEnd)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 3a41ee8..627c9a4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -46,7 +46,7 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with Matchers
     assert(counter.count === 0)
 
     // Starting listener bus should flush all buffered events
-    bus.start()
+    bus.start(sc)
     assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     assert(counter.count === 5)
 
@@ -58,8 +58,8 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with Matchers
     // Listener bus must not be started twice
     intercept[IllegalStateException] {
       val bus = new LiveListenerBus
-      bus.start()
-      bus.start()
+      bus.start(sc)
+      bus.start(sc)
     }
 
     // ... or stopped before starting
@@ -96,7 +96,7 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with Matchers
     val blockingListener = new BlockingListener
 
     bus.addListener(blockingListener)
-    bus.start()
+    bus.start(sc)
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
 
     listenerStarted.acquire()
@@ -347,7 +347,7 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with Matchers
     bus.addListener(badListener)
     bus.addListener(jobCounter1)
     bus.addListener(jobCounter2)
-    bus.start()
+    bus.start(sc)
 
     // Post events to all listeners, and wait until the queue is drained
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded)) }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index b3ffc71..60bc099 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -61,7 +61,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
       }
     }), "JobScheduler")
 
-    listenerBus.start()
+    listenerBus.start(ssc.sparkContext)
     receiverTracker = new ReceiverTracker(ssc)
     receiverTracker.start()
     jobGenerator.start()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to