Repository: spark
Updated Branches:
  refs/heads/master 1035aaa61 -> 359375eff


[SPARK-23809][SQL] Active SparkSession should be set by getOrCreate

## What changes were proposed in this pull request?

Currently, the active spark session is set inconsistently (e.g., in 
createDataFrame, prior to query execution). Many places in spark also 
incorrectly query active session when they should be calling 
activeSession.getOrElse(defaultSession) and so might get None even if a Spark 
session exists.

The semantics here can be cleaned up if we also set the active session when the 
default session is set.

Related: https://github.com/apache/spark/pull/20926/files

## How was this patch tested?

Unit test, existing test. Note that if 
https://github.com/apache/spark/pull/20926 merges first we should also update 
the tests there.

Author: Eric Liang <e...@databricks.com>

Closes #20927 from ericl/active-session-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/359375ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/359375ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/359375ef

Branch: refs/heads/master
Commit: 359375eff74630c9f0ea5a90ab7d45bf1b281ed0
Parents: 1035aaa
Author: Eric Liang <e...@databricks.com>
Authored: Tue Apr 3 17:09:12 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Apr 3 17:09:12 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SparkSession.scala | 14 +++++++++++++-
 .../spark/sql/SparkSessionBuilderSuite.scala      | 18 ++++++++++++++++++
 .../apache/spark/sql/test/TestSQLContext.scala    |  1 +
 .../org/apache/spark/sql/hive/test/TestHive.scala |  3 +++
 4 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/359375ef/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 734573b..b107492 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -951,7 +951,8 @@ object SparkSession {
 
         session = new SparkSession(sparkContext, None, None, extensions)
         options.foreach { case (k, v) => session.initialSessionOptions.put(k, 
v) }
-        defaultSession.set(session)
+        setDefaultSession(session)
+        setActiveSession(session)
 
         // Register a successfully instantiated context to the singleton. This 
should be at the
         // end of the class definition so that the singleton is updated only 
if there is no
@@ -1027,6 +1028,17 @@ object SparkSession {
    */
   def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)
 
+  /**
+   * Returns the currently active SparkSession, otherwise the default one. If 
there is no default
+   * SparkSession, throws an exception.
+   *
+   * @since 2.4.0
+   */
+  def active: SparkSession = {
+    getActiveSession.getOrElse(getDefaultSession.getOrElse(
+      throw new IllegalStateException("No active or default Spark session 
found")))
+  }
+
   
////////////////////////////////////////////////////////////////////////////////////////
   // Private methods from now on
   
////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/359375ef/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index c0301f2..44bf862 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -50,6 +50,24 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(SparkSession.builder().getOrCreate() == session)
   }
 
+  test("sets default and active session") {
+    assert(SparkSession.getDefaultSession == None)
+    assert(SparkSession.getActiveSession == None)
+    val session = SparkSession.builder().master("local").getOrCreate()
+    assert(SparkSession.getDefaultSession == Some(session))
+    assert(SparkSession.getActiveSession == Some(session))
+  }
+
+  test("get active or default session") {
+    val session = SparkSession.builder().master("local").getOrCreate()
+    assert(SparkSession.active == session)
+    SparkSession.clearActiveSession()
+    assert(SparkSession.active == session)
+    SparkSession.clearDefaultSession()
+    intercept[IllegalStateException](SparkSession.active)
+    session.stop()
+  }
+
   test("config options are propagated to existing SparkSession") {
     val session1 = 
SparkSession.builder().master("local").config("spark-config1", 
"a").getOrCreate()
     assert(session1.conf.get("spark-config1") == "a")

http://git-wip-us.apache.org/repos/asf/spark/blob/359375ef/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 3038b82..17603de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -35,6 +35,7 @@ private[spark] class TestSparkSession(sc: SparkContext) 
extends SparkSession(sc)
   }
 
   SparkSession.setDefaultSession(this)
+  SparkSession.setActiveSession(this)
 
   @transient
   override lazy val sessionState: SessionState = {

http://git-wip-us.apache.org/repos/asf/spark/blob/359375ef/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 814038d..a7006a1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -179,6 +179,9 @@ private[hive] class TestHiveSparkSession(
       loadTestTables)
   }
 
+  SparkSession.setDefaultSession(this)
+  SparkSession.setActiveSession(this)
+
   { // set the metastore temporary configuration
     val metastoreTempConf = 
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
       ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to