Repository: spark
Updated Branches:
  refs/heads/master 6287c94f0 -> 303f00a4b


[SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in 
Signaling.cancelOnInterrupt

## What changes were proposed in this pull request?

`Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes 
ReplSuite unstable.

This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to 
get the active `SparkContext` to avoid the leak.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #16825 from zsxwing/SPARK-19481.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/303f00a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/303f00a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/303f00a4

Branch: refs/heads/master
Commit: 303f00a4bf6660dd83c8bd9e3a107bb3438a421b
Parents: 6287c94
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Feb 9 11:16:51 2017 -0800
Committer: Davies Liu <davies....@gmail.com>
Committed: Thu Feb 9 11:16:51 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  7 +++++++
 .../main/scala/org/apache/spark/repl/Main.scala |  1 +
 .../org/apache/spark/repl/SparkILoop.scala      |  1 -
 .../main/scala/org/apache/spark/repl/Main.scala |  2 +-
 .../scala/org/apache/spark/repl/Signaling.scala | 20 +++++++++++---------
 5 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 40189a2..eb13686 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2489,6 +2489,13 @@ object SparkContext extends Logging {
     }
   }
 
+  /** Return the current active [[SparkContext]] if any. */
+  private[spark] def getActive: Option[SparkContext] = {
+    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
+      Option(activeContext.get())
+    }
+  }
+
   /**
    * Called at the beginning of the SparkContext constructor to ensure that no 
SparkContext is
    * running.  Throws an exception if a running context is detected and logs a 
warning if another

http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
index 7b4e14b..fba321b 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
@@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
 object Main extends Logging {
 
   initializeLogIfNecessary(true)
+  Signaling.cancelOnInterrupt()
 
   private var _interp: SparkILoop = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index e017aa4..b7237a6 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1027,7 +1027,6 @@ class SparkILoop(
       builder.getOrCreate()
     }
     sparkContext = sparkSession.sparkContext
-    Signaling.cancelOnInterrupt(sparkContext)
     sparkSession
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index fec4d49..7f2ec01 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
 object Main extends Logging {
 
   initializeLogIfNecessary(true)
+  Signaling.cancelOnInterrupt()
 
   val conf = new SparkConf()
   val rootDir = 
conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
@@ -108,7 +109,6 @@ object Main extends Logging {
       logInfo("Created Spark session")
     }
     sparkContext = sparkSession.sparkContext
-    Signaling.cancelOnInterrupt(sparkContext)
     sparkSession
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala 
b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
index 202febf..9577e0e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
@@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging {
    * when no jobs are currently running.
    * This makes it possible to interrupt a running shell job by pressing 
Ctrl+C.
    */
-  def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") 
{
-    if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
-      logWarning("Cancelling all active jobs, this can take a while. " +
-        "Press Ctrl+C again to exit now.")
-      ctx.cancelAllJobs()
-      true
-    } else {
-      false
-    }
+  def cancelOnInterrupt(): Unit = SignalUtils.register("INT") {
+    SparkContext.getActive.map { ctx =>
+      if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
+        logWarning("Cancelling all active jobs, this can take a while. " +
+          "Press Ctrl+C again to exit now.")
+        ctx.cancelAllJobs()
+        true
+      } else {
+        false
+      }
+    }.getOrElse(false)
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to