This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0f2afd3 [SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener 0f2afd3 is described below commit 0f2afd3455e164dcd273f6c69774d08f7121af8d Author: Vinoo Ganesh <vinoo.gan...@gmail.com> AuthorDate: Thu May 21 16:06:28 2020 +0000 [SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener ## What changes were proposed in this pull request? This change was made as a result of the conversation on https://issues.apache.org/jira/browse/SPARK-31354 and is intended to continue work from that ticket here. This change fixes a memory leak where SparkSession listeners are never cleared off of the SparkContext listener bus. Before running this PR, the following code: ``` SparkSession.builder().master("local").getOrCreate() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() SparkSession.builder().master("local").getOrCreate() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() ``` would result in a SparkContext with the following listeners on the listener bus: ``` [org.apache.spark.status.AppStatusListener5f610071, org.apache.spark.HeartbeatReceiverd400c17, org.apache.spark.sql.SparkSession$$anon$125849aeb, <-First instance org.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- Second instance ``` After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus: ``` [org.apache.spark.status.AppStatusListener5f610071, org.apache.spark.HeartbeatReceiverd400c17, org.apache.spark.sql.SparkSession$$anon$125849aeb] <-One instance ``` ## How was this patch tested? * Unit test included as a part of the PR Closes #28128 from vinooganesh/vinooganesh/SPARK-27958. Lead-authored-by: Vinoo Ganesh <vinoo.gan...@gmail.com> Co-authored-by: Vinoo Ganesh <vgan...@palantir.com> Co-authored-by: Vinoo Ganesh <vi...@safegraph.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit dae79888dc6476892877d3b3b233381cdbf7fa74) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/sql/SparkSession.scala | 27 +++++++++++++--------- .../spark/sql/SparkSessionBuilderSuite.scala | 25 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index be597ed..60a6037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.io.Closeable import java.util.concurrent.TimeUnit._ -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag @@ -49,7 +49,6 @@ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.{CallSite, Utils} - /** * The entry point to programming Spark with the Dataset and DataFrame API. * @@ -940,15 +939,7 @@ object SparkSession extends Logging { options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } setDefaultSession(session) setActiveSession(session) - - // Register a successfully instantiated context to the singleton. This should be at the - // end of the class definition so that the singleton is updated only if there is no - // exception in the construction of the instance. - sparkContext.addSparkListener(new SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - defaultSession.set(null) - } - }) + registerContextListener(sparkContext) } return session @@ -1064,6 +1055,20 @@ object SparkSession extends Logging { // Private methods from now on //////////////////////////////////////////////////////////////////////////////////////// + private val listenerRegistered: AtomicBoolean = new AtomicBoolean(false) + + /** Register the AppEnd listener onto the Context */ + private def registerContextListener(sparkContext: SparkContext): Unit = { + if (!listenerRegistered.get()) { + sparkContext.addSparkListener(new SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + defaultSession.set(null) + } + }) + listenerRegistered.set(true) + } + } + /** The active SparkSession for the current thread. */ private val activeThreadSession = new InheritableThreadLocal[SparkSession] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 7b76d07..0a522fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -169,6 +169,31 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") } + test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-app-SPARK-31354-1") + val context = new SparkContext(conf) + SparkSession + .builder() + .sparkContext(context) + .master("local") + .getOrCreate() + val postFirstCreation = context.listenerBus.listeners.size() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + + SparkSession + .builder() + .sparkContext(context) + .master("local") + .getOrCreate() + val postSecondCreation = context.listenerBus.listeners.size() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + assert(postFirstCreation == postSecondCreation) + } + test("SPARK-31532: should not propagate static sql configs to the existing" + " active/default SparkSession") { val session = SparkSession.builder() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org