Repository: spark
Updated Branches:
  refs/heads/master eb019af9a -> a432a2b86


[SPARK-15116] In REPL we should create SparkSession first and get SparkContext 
from it

## What changes were proposed in this pull request?

see https://github.com/apache/spark/pull/12873#discussion_r61993910. The 
problem is, if we create `SparkContext` first and then call 
`SparkSession.builder.enableHiveSupport().getOrCreate()`, we will reuse the 
existing `SparkContext` and the hive flag won't be set.

## How was this patch tested?

verified it locally.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #12890 from cloud-fan/repl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a432a2b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a432a2b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a432a2b8

Branch: refs/heads/master
Commit: a432a2b86081a18cebf4085cead702436960f6c7
Parents: eb019af
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed May 4 14:40:54 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed May 4 14:40:54 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/repl/SparkILoop.scala      | 20 ++++++---------
 .../org/apache/spark/repl/SparkILoopInit.scala  | 11 +++-----
 .../main/scala/org/apache/spark/repl/Main.scala | 27 +++++++++-----------
 .../org/apache/spark/repl/SparkILoop.scala      | 11 +++-----
 4 files changed, 26 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a432a2b8/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index c4f6450..b1e95d8 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1003,7 +1003,7 @@ class SparkILoop(
 
   // NOTE: Must be public for visibility
   @DeveloperApi
-  def createSparkContext(): SparkContext = {
+  def createSparkSession(): SparkSession = {
     val execUri = System.getenv("SPARK_EXECUTOR_URI")
     val jars = SparkILoop.getAddedJars
     val conf = new SparkConf()
@@ -1019,22 +1019,18 @@ class SparkILoop(
     if (execUri != null) {
       conf.set("spark.executor.uri", execUri)
     }
-    sparkContext = new SparkContext(conf)
-    logInfo("Created spark context..")
-    Signaling.cancelOnInterrupt(sparkContext)
-    sparkContext
-  }
 
-  @DeveloperApi
-  // TODO: don't duplicate this code
-  def createSparkSession(): SparkSession = {
-    if (SparkSession.hiveClassesArePresent) {
+    val builder = SparkSession.builder.config(conf)
+    val sparkSession = if (SparkSession.hiveClassesArePresent) {
       logInfo("Creating Spark session with Hive support")
-      SparkSession.builder.enableHiveSupport().getOrCreate()
+      builder.enableHiveSupport().getOrCreate()
     } else {
       logInfo("Creating Spark session")
-      SparkSession.builder.getOrCreate()
+      builder.getOrCreate()
     }
+    sparkContext = sparkSession.sparkContext
+    Signaling.cancelOnInterrupt(sparkContext)
+    sparkSession
   }
 
   private def getMaster(): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a432a2b8/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index f1febb9..29f63de 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -123,19 +123,14 @@ private[repl] trait SparkILoopInit {
   def initializeSpark() {
     intp.beQuietDuring {
       command("""
+        @transient val spark = 
org.apache.spark.repl.Main.interp.createSparkSession()
         @transient val sc = {
-          val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
+          val _sc = spark.sparkContext
           _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI 
available at ${webUrl}"))
           println("Spark context available as 'sc' " +
             s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
-          _sc
-        }
-        """)
-      command("""
-        @transient val spark = {
-          val _session = org.apache.spark.repl.Main.interp.createSparkSession()
           println("Spark session available as 'spark'.")
-          _session
+          _sc
         }
         """)
       command("import org.apache.spark.SparkContext._")

http://git-wip-us.apache.org/repos/asf/spark/blob/a432a2b8/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index a171759..005edda 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -71,35 +71,32 @@ object Main extends Logging {
     }
   }
 
-  def createSparkContext(): SparkContext = {
+  def createSparkSession(): SparkSession = {
     val execUri = System.getenv("SPARK_EXECUTOR_URI")
     conf.setIfMissing("spark.app.name", "Spark shell")
-      // SparkContext will detect this configuration and register it with the 
RpcEnv's
-      // file server, setting spark.repl.class.uri to the actual URI for 
executors to
-      // use. This is sort of ugly but since executors are started as part of 
SparkContext
-      // initialization in certain cases, there's an initialization order 
issue that prevents
-      // this from being set after SparkContext is instantiated.
-      .set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
+    // SparkContext will detect this configuration and register it with the 
RpcEnv's
+    // file server, setting spark.repl.class.uri to the actual URI for 
executors to
+    // use. This is sort of ugly but since executors are started as part of 
SparkContext
+    // initialization in certain cases, there's an initialization order issue 
that prevents
+    // this from being set after SparkContext is instantiated.
+    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
     if (execUri != null) {
       conf.set("spark.executor.uri", execUri)
     }
     if (System.getenv("SPARK_HOME") != null) {
       conf.setSparkHome(System.getenv("SPARK_HOME"))
     }
-    sparkContext = new SparkContext(conf)
-    logInfo("Created spark context..")
-    Signaling.cancelOnInterrupt(sparkContext)
-    sparkContext
-  }
 
-  def createSparkSession(): SparkSession = {
+    val builder = SparkSession.builder.config(conf)
     if (SparkSession.hiveClassesArePresent) {
-      sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()
+      sparkSession = builder.enableHiveSupport().getOrCreate()
       logInfo("Created Spark session with Hive support")
     } else {
-      sparkSession = SparkSession.builder.getOrCreate()
+      sparkSession = builder.getOrCreate()
       logInfo("Created Spark session")
     }
+    sparkContext = sparkSession.sparkContext
+    Signaling.cancelOnInterrupt(sparkContext)
     sparkSession
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a432a2b8/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index d74b796..bbdb992d 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -36,19 +36,14 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
   def initializeSpark() {
     intp.beQuietDuring {
       processLine("""
+        @transient val spark = org.apache.spark.repl.Main.createSparkSession()
         @transient val sc = {
-          val _sc = org.apache.spark.repl.Main.createSparkContext()
+          val _sc = spark.sparkContext
           _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI 
available at ${webUrl}"))
           println("Spark context available as 'sc' " +
             s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
-          _sc
-        }
-        """)
-      processLine("""
-        @transient val spark = {
-          val _session = org.apache.spark.repl.Main.createSparkSession()
           println("Spark session available as 'spark'.")
-          _session
+          _sc
         }
         """)
       processLine("import org.apache.spark.SparkContext._")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to