Repository: spark
Updated Branches:
  refs/heads/master 8e1c00dbf -> 5d45e1f60


[SPARK-3090] [CORE] Stop SparkContext if user forgets to.

Set up a shutdown hook to try to stop the Spark context in
case the user forgets to do it. The main effect is that any
open logs files are flushed and closed, which is particularly
interesting for event logs.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #5696 from vanzin/SPARK-3090 and squashes the following commits:

3b554b5 [Marcelo Vanzin] [SPARK-3090] [core] Stop SparkContext if user forgets 
to.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d45e1f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d45e1f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d45e1f6

Branch: refs/heads/master
Commit: 5d45e1f60059e2f2fc8ad64778b9ddcc8887c570
Parents: 8e1c00d
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Mon Apr 27 19:46:17 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Apr 27 19:46:17 2015 -0400

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 38 +++++++++++++-------
 .../scala/org/apache/spark/util/Utils.scala     | 10 ++++--
 .../spark/deploy/yarn/ApplicationMaster.scala   | 10 ++----
 3 files changed, 35 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d45e1f6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ea4ddcc..65b903a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   private var _listenerBusStarted: Boolean = false
   private var _jars: Seq[String] = _
   private var _files: Seq[String] = _
+  private var _shutdownHookRef: AnyRef = _
 
   /* 
-------------------------------------------------------------------------------------
 *
    | Accessors and public fields. These provide access to the internal state 
of the        |
@@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     _taskScheduler.postStartHook()
     _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
     _env.metricsSystem.registerSource(new 
BlockManagerSource(_env.blockManager))
+
+    // Make sure the context is stopped if the user forgets about it. This 
avoids leaving
+    // unfinished event logs around after the JVM exits cleanly. It doesn't 
help if the JVM
+    // is killed, though.
+    _shutdownHookRef = 
Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+      logInfo("Invoking stop() from shutdown hook")
+      stop()
+    }
   } catch {
     case NonFatal(e) =>
       logError("Error initializing SparkContext.", e)
@@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       logInfo("SparkContext already stopped.")
       return
     }
+    if (_shutdownHookRef != null) {
+      Utils.removeShutdownHook(_shutdownHookRef)
+    }
 
     postApplicationEnd()
     _ui.foreach(_.stop())
@@ -1891,7 +1903,7 @@ object SparkContext extends Logging {
    *
    * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
    */
-  private val activeContext: AtomicReference[SparkContext] = 
+  private val activeContext: AtomicReference[SparkContext] =
     new AtomicReference[SparkContext](null)
 
   /**
@@ -1944,11 +1956,11 @@ object SparkContext extends Logging {
   }
 
   /**
-   * This function may be used to get or instantiate a SparkContext and 
register it as a 
-   * singleton object. Because we can only have one active SparkContext per 
JVM, 
-   * this is useful when applications may wish to share a SparkContext. 
+   * This function may be used to get or instantiate a SparkContext and 
register it as a
+   * singleton object. Because we can only have one active SparkContext per 
JVM,
+   * this is useful when applications may wish to share a SparkContext.
    *
-   * Note: This function cannot be used to create multiple SparkContext 
instances 
+   * Note: This function cannot be used to create multiple SparkContext 
instances
    * even if multiple contexts are allowed.
    */
   def getOrCreate(config: SparkConf): SparkContext = {
@@ -1961,17 +1973,17 @@ object SparkContext extends Logging {
       activeContext.get()
     }
   }
-  
+
   /**
-   * This function may be used to get or instantiate a SparkContext and 
register it as a 
-   * singleton object. Because we can only have one active SparkContext per 
JVM, 
+   * This function may be used to get or instantiate a SparkContext and 
register it as a
+   * singleton object. Because we can only have one active SparkContext per 
JVM,
    * this is useful when applications may wish to share a SparkContext.
-   * 
+   *
    * This method allows not passing a SparkConf (useful if just retrieving).
-   * 
-   * Note: This function cannot be used to create multiple SparkContext 
instances 
-   * even if multiple contexts are allowed. 
-   */ 
+   *
+   * Note: This function cannot be used to create multiple SparkContext 
instances
+   * even if multiple contexts are allowed.
+   */
   def getOrCreate(): SparkContext = {
     getOrCreate(new SparkConf())
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5d45e1f6/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c6c6df7..342bc9a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -67,6 +67,12 @@ private[spark] object Utils extends Logging {
 
   val DEFAULT_SHUTDOWN_PRIORITY = 100
 
+  /**
+   * The shutdown priority of the SparkContext instance. This is lower than 
the default
+   * priority, so that by default hooks are run before the context is shut 
down.
+   */
+  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
@@ -2116,7 +2122,7 @@ private[spark] object Utils extends Logging {
    * @return A handle that can be used to unregister the shutdown hook.
    */
   def addShutdownHook(hook: () => Unit): AnyRef = {
-    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
   }
 
   /**
@@ -2126,7 +2132,7 @@ private[spark] object Utils extends Logging {
    * @param hook The code to run during shutdown.
    * @return A handle that can be used to unregister the shutdown hook.
    */
-  def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
     shutdownHooks.add(priority, hook)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5d45e1f6/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 93ae451..70cb57f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -95,14 +95,8 @@ private[spark] class ApplicationMaster(
 
       val fs = FileSystem.get(yarnConf)
 
-      Utils.addShutdownHook { () =>
-        // If the SparkContext is still registered, shut it down as a best 
case effort in case
-        // users do not call sc.stop or do System.exit().
-        val sc = sparkContextRef.get()
-        if (sc != null) {
-          logInfo("Invoking sc stop from shutdown hook")
-          sc.stop()
-        }
+      // This shutdown hook should run *after* the SparkContext is shut down.
+      Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1) { () =>
         val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
         val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to