Repository: spark Updated Branches: refs/heads/master 6287c94f0 -> 303f00a4b
[SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt ## What changes were proposed in this pull request? `Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable. This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixi...@databricks.com> Closes #16825 from zsxwing/SPARK-19481. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/303f00a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/303f00a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/303f00a4 Branch: refs/heads/master Commit: 303f00a4bf6660dd83c8bd9e3a107bb3438a421b Parents: 6287c94 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Thu Feb 9 11:16:51 2017 -0800 Committer: Davies Liu <davies....@gmail.com> Committed: Thu Feb 9 11:16:51 2017 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 7 +++++++ .../main/scala/org/apache/spark/repl/Main.scala | 1 + .../org/apache/spark/repl/SparkILoop.scala | 1 - .../main/scala/org/apache/spark/repl/Main.scala | 2 +- .../scala/org/apache/spark/repl/Signaling.scala | 20 +++++++++++--------- 5 files changed, 20 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 40189a2..eb13686 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2489,6 +2489,13 @@ object SparkContext extends Logging { } } + /** Return the current active [[SparkContext]] if any. */ + private[spark] def getActive: Option[SparkContext] = { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + Option(activeContext.get()) + } + } + /** * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is * running. Throws an exception if a running context is detected and logs a warning if another http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala index 7b4e14b..fba321b 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala @@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging object Main extends Logging { initializeLogIfNecessary(true) + Signaling.cancelOnInterrupt() private var _interp: SparkILoop = _ http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index e017aa4..b7237a6 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1027,7 +1027,6 @@ class SparkILoop( builder.getOrCreate() } sparkContext = sparkSession.sparkContext - Signaling.cancelOnInterrupt(sparkContext) sparkSession } http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index fec4d49..7f2ec01 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils object Main extends Logging { initializeLogIfNecessary(true) + Signaling.cancelOnInterrupt() val conf = new SparkConf() val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) @@ -108,7 +109,6 @@ object Main extends Logging { logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext - Signaling.cancelOnInterrupt(sparkContext) sparkSession } http://git-wip-us.apache.org/repos/asf/spark/blob/303f00a4/repl/src/main/scala/org/apache/spark/repl/Signaling.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala index 202febf..9577e0e 100644 --- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala +++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala @@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging { * when no jobs are currently running. * This makes it possible to interrupt a running shell job by pressing Ctrl+C. */ - def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") { - if (!ctx.statusTracker.getActiveJobIds().isEmpty) { - logWarning("Cancelling all active jobs, this can take a while. " + - "Press Ctrl+C again to exit now.") - ctx.cancelAllJobs() - true - } else { - false - } + def cancelOnInterrupt(): Unit = SignalUtils.register("INT") { + SparkContext.getActive.map { ctx => + if (!ctx.statusTracker.getActiveJobIds().isEmpty) { + logWarning("Cancelling all active jobs, this can take a while. " + + "Press Ctrl+C again to exit now.") + ctx.cancelAllJobs() + true + } else { + false + } + }.getOrElse(false) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org