Fixed sqlContext not appearing

Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/73cc589d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/73cc589d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/73cc589d

Branch: refs/heads/master
Commit: 73cc589d412277a1fd1c79e70945bc1d2cccd33d
Parents: 3905e47
Author: Chip Senkbeil <pair+rcsen...@us.ibm.com>
Authored: Tue Dec 8 11:47:18 2015 -0600
Committer: Chip Senkbeil <pair+rcsen...@us.ibm.com>
Committed: Tue Dec 8 11:49:02 2015 -0600

----------------------------------------------------------------------
 .../com/ibm/spark/interpreter/Interpreter.scala | 15 ++++++-
 .../boot/layer/ComponentInitialization.scala    | 45 --------------------
 .../scala/com/ibm/spark/kernel/api/Kernel.scala | 45 ++++++++++++++++++--
 .../com/ibm/spark/kernel/api/KernelSpec.scala   |  1 -
 .../scala/test/utils/DummyInterpreter.scala     | 16 +++++++
 .../pyspark/PySparkInterpreter.scala            |  8 ++--
 .../interpreter/scala/ScalaInterpreter.scala    | 41 +++++++++++++-----
 .../interpreter/sparkr/SparkRInterpreter.scala  |  5 +++
 .../kernel/interpreter/sql/SqlInterpreter.scala |  4 ++
 9 files changed, 116 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala
----------------------------------------------------------------------
diff --git 
a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala 
b/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala
index 76d4432..6200b9b 100644
--- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala
+++ b/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala
@@ -20,6 +20,7 @@ import java.net.URL
 
 import com.ibm.spark.kernel.api.KernelLike
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
 
 import scala.tools.nsc.interpreter._
 
@@ -79,7 +80,19 @@ trait Interpreter {
    */
   def doQuietly[T](body: => T): T
 
-  def bindSparkContext(sparkContext: SparkContext): Unit = ???
+  /**
+   * Binds the SparkContext instance to the interpreter's namespace.
+   *
+   * @param sparkContext The SparkContext to bind
+   */
+  def bindSparkContext(sparkContext: SparkContext): Unit
+
+  /**
+   * Binds the SQLContext instance to the interpreter's namespace.
+   *
+   * @param sqlContext The SQLContext to bind
+   */
+  def bindSqlContext(sqlContext: SQLContext): Unit
 
   /**
    * Binds a variable in the interpreter to a value.

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git 
a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala 
b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
index 973075b..939b896 100644
--- 
a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
+++ 
b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala
@@ -128,51 +128,6 @@ trait StandardComponentInitialization extends 
ComponentInitialization {
     dependencyDownloader
   }
 
-  protected[layer] def initializeSqlContext(sparkContext: SparkContext) = {
-    val sqlContext: SQLContext = try {
-      logger.info("Attempting to create Hive Context")
-      val hiveContextClassString =
-        "org.apache.spark.sql.hive.HiveContext"
-
-      logger.debug(s"Looking up $hiveContextClassString")
-      val hiveContextClass = Class.forName(hiveContextClassString)
-
-      val sparkContextClass = classOf[SparkContext]
-      val sparkContextClassName = sparkContextClass.getName
-
-      logger.debug(s"Searching for constructor taking $sparkContextClassName")
-      val hiveContextContructor =
-        hiveContextClass.getConstructor(sparkContextClass)
-
-      logger.debug("Invoking Hive Context constructor")
-      hiveContextContructor.newInstance(sparkContext).asInstanceOf[SQLContext]
-    } catch {
-      case _: Throwable =>
-        logger.warn("Unable to create Hive Context! Defaulting to SQL 
Context!")
-        new SQLContext(sparkContext)
-    }
-
-    sqlContext
-  }
-
-  protected[layer] def updateInterpreterWithSqlContext(
-    sqlContext: SQLContext, interpreter: Interpreter
-  ): Unit = {
-    interpreter.doQuietly {
-      // TODO: This only adds the context to the main interpreter AND
-      //       is limited to the Scala interpreter interface
-      logger.debug("Adding SQL Context to main interpreter")
-      interpreter.bind(
-        "sqlContext",
-        classOf[SQLContext].getName,
-        sqlContext,
-        List( """@transient""")
-      )
-
-      sqlContext
-    }
-  }
-
   protected def initializeResponseMap(): collection.mutable.Map[String, 
ActorRef] =
     new ConcurrentHashMap[String, ActorRef]().asScala
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala 
b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
index 3ff4f85..219804a 100644
--- a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala
@@ -329,11 +329,13 @@ class Kernel (
     _sparkConf = createSparkConf(conf)
     _sparkContext = initializeSparkContext(sparkConf)
     _javaSparkContext = new JavaSparkContext(_sparkContext)
-    _sqlContext = new SQLContext(_sparkContext)
+    _sqlContext = initializeSqlContext(_sparkContext)
 
-    logger.info( s"Connecting to spark.master 
${_sparkConf.getOption("spark.master").getOrElse("not_set")}")
+    val sparkMaster = _sparkConf.getOption("spark.master").getOrElse("not_set")
+    logger.info( s"Connecting to spark.master $sparkMaster")
 
-    updateInterpreterWithSparkContext(sparkContext)
+    updateInterpreterWithSparkContext(interpreter, sparkContext)
+    updateInterpreterWithSqlContext(interpreter, sqlContext)
 
     magicLoader.dependencyMap =
       magicLoader.dependencyMap.setSparkContext(_sparkContext)
@@ -394,12 +396,47 @@ class Kernel (
 
   // TODO: Think of a better way to test without exposing this
   protected[kernel] def updateInterpreterWithSparkContext(
-    sparkContext: SparkContext
+    interpreter: Interpreter, sparkContext: SparkContext
   ) = {
 
     interpreter.bindSparkContext(sparkContext)
   }
 
+  protected[kernel] def initializeSqlContext(
+    sparkContext: SparkContext
+  ): SQLContext = {
+    val sqlContext: SQLContext = try {
+      logger.info("Attempting to create Hive Context")
+      val hiveContextClassString =
+        "org.apache.spark.sql.hive.HiveContext"
+
+      logger.debug(s"Looking up $hiveContextClassString")
+      val hiveContextClass = Class.forName(hiveContextClassString)
+
+      val sparkContextClass = classOf[SparkContext]
+      val sparkContextClassName = sparkContextClass.getName
+
+      logger.debug(s"Searching for constructor taking $sparkContextClassName")
+      val hiveContextContructor =
+        hiveContextClass.getConstructor(sparkContextClass)
+
+      logger.debug("Invoking Hive Context constructor")
+      hiveContextContructor.newInstance(sparkContext).asInstanceOf[SQLContext]
+    } catch {
+      case _: Throwable =>
+        logger.warn("Unable to create Hive Context! Defaulting to SQL 
Context!")
+        new SQLContext(sparkContext)
+    }
+
+    sqlContext
+  }
+
+  protected[kernel] def updateInterpreterWithSqlContext(
+    interpreter: Interpreter, sqlContext: SQLContext
+  ): Unit = {
+    interpreter.bindSqlContext(sqlContext)
+  }
+
   override def interpreter(name: String): Option[Interpreter] = {
     interpreterManager.interpreters.get(name)
   }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala 
b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
index 98157a9..58ea0c5 100644
--- a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
+++ b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala
@@ -10,7 +10,6 @@ import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader
 import com.ibm.spark.magic.MagicLoader
 import com.typesafe.config.Config
 import org.apache.spark.{SparkConf, SparkContext}
-import org.mockito.ArgumentCaptor
 import org.mockito.Mockito._
 import org.mockito.Matchers._
 import org.scalatest.mock.MockitoSugar

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/test/scala/test/utils/DummyInterpreter.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/test/utils/DummyInterpreter.scala 
b/kernel/src/test/scala/test/utils/DummyInterpreter.scala
index ee7c096..aec9635 100644
--- a/kernel/src/test/scala/test/utils/DummyInterpreter.scala
+++ b/kernel/src/test/scala/test/utils/DummyInterpreter.scala
@@ -5,6 +5,8 @@ import java.net.URL
 import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter}
 import com.ibm.spark.interpreter.Results.Result
 import com.ibm.spark.kernel.api.KernelLike
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
 
 import scala.tools.nsc.interpreter.{OutputStream, InputStream}
 
@@ -109,4 +111,18 @@ class DummyInterpreter(kernel: KernelLike) extends 
Interpreter {
    * @return The newly initialized interpreter
    */
   override def init(kernel: KernelLike): Interpreter = ???
+
+  /**
+   * Binds the SparkContext instance to the interpreter's namespace.
+   *
+   * @param sparkContext The SparkContext to bind
+   */
+  override def bindSparkContext(sparkContext: SparkContext): Unit = ???
+
+  /**
+   * Binds the SQLContext instance to the interpreter's namespace.
+   *
+   * @param sqlContext The SQLContext to bind
+   */
+  override def bindSqlContext(sqlContext: SQLContext): Unit = ???
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
 
b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
index 38e1d68..615ed19 100644
--- 
a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
+++ 
b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala
@@ -21,6 +21,7 @@ import com.ibm.spark.interpreter.Results.Result
 import com.ibm.spark.interpreter._
 import com.ibm.spark.kernel.api.KernelLike
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
 import org.slf4j.LoggerFactory
 import py4j.GatewayServer
 
@@ -78,10 +79,11 @@ class PySparkInterpreter(
     this
   }
 
+  // Unsupported (but can be invoked)
+  override def bindSparkContext(sparkContext: SparkContext): Unit = {}
 
-  override def bindSparkContext(sparkContext: SparkContext) = {
-
-  }
+  // Unsupported (but can be invoked)
+  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
 
   /**
    * Executes the provided code with the option to silence output.

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
 
b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
index 078054a..ca9bd7a 100644
--- 
a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
+++ 
b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala
@@ -31,6 +31,7 @@ import com.ibm.spark.kernel.api.{KernelLike, KernelOptions}
 import com.ibm.spark.utils.{MultiOutputStream, TaskManager}
 import org.apache.spark.SparkContext
 import org.apache.spark.repl.{SparkCommandLine, SparkIMain, 
SparkJLineCompletion}
+import org.apache.spark.sql.SQLContext
 import org.slf4j.LoggerFactory
 
 import scala.annotation.tailrec
@@ -514,12 +515,16 @@ class ScalaInterpreter() extends Interpreter {
   }
 
   override def bindSparkContext(sparkContext: SparkContext) = {
+    val bindName = "sc"
 
     doQuietly {
-      logger.debug("Binding context into interpreter")
+      logger.debug(s"Binding SparkContext into interpreter as $bindName")
       bind(
-        "sc", "org.apache.spark.SparkContext",
-        sparkContext, List( """@transient"""))
+        bindName,
+        "org.apache.spark.SparkContext",
+        sparkContext,
+        List( """@transient""")
+      )
 
       // NOTE: This is needed because interpreter blows up after adding
       //       dependencies to SparkContext and Interpreter before the
@@ -530,16 +535,32 @@ class ScalaInterpreter() extends Interpreter {
       logger.debug("Initializing Spark cluster in interpreter")
 
       doQuietly {
-        interpret("""
-                                | val $toBeNulled = {
-                                | var $toBeNulled = sc.emptyRDD.collect()
-                                | $toBeNulled = null
-                                |  }
-                                |
-                                |""".stripMargin)
+        interpret(Seq(
+          "val $toBeNulled = {",
+          "  var $toBeNulled = sc.emptyRDD.collect()",
+          "  $toBeNulled = null",
+          "}"
+        ).mkString("\n").trim())
       }
     }
+  }
+
+  override def bindSqlContext(sqlContext: SQLContext): Unit = {
+    val bindName = "sqlContext"
+
+    doQuietly {
+      // TODO: This only adds the context to the main interpreter AND
+      //       is limited to the Scala interpreter interface
+      logger.debug(s"Binding SQLContext into interpreter as $bindName")
+      bind(
+        bindName,
+        classOf[SQLContext].getName,
+        sqlContext,
+        List( """@transient""")
+      )
 
+      sqlContext
+    }
   }
 
   override def bind(

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
 
b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
index a950da0..45fe03c 100644
--- 
a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
+++ 
b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala
@@ -21,6 +21,7 @@ import com.ibm.spark.interpreter.Results.Result
 import com.ibm.spark.interpreter._
 import com.ibm.spark.kernel.api.KernelLike
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.Await
@@ -133,8 +134,12 @@ class SparkRInterpreter(
   // Unsupported
   override def classServerURI: String = ""
 
+  // Unsupported (but can be invoked)
   override def bindSparkContext(sparkContext: SparkContext): Unit = {}
 
+  // Unsupported (but can be invoked)
+  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
   // Unsupported
   override def interrupt(): Interpreter = ???
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
 
b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
index 837d917..889d4a6 100644
--- 
a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
+++ 
b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala
@@ -102,8 +102,12 @@ class SqlInterpreter() extends Interpreter {
   // Unsupported
   override def classServerURI: String = ""
 
+  // Unsupported (but can be invoked)
   override def bindSparkContext(sparkContext: SparkContext): Unit = {}
 
+  // Unsupported (but can be invoked)
+  override def bindSqlContext(sqlContext: SQLContext): Unit = {}
+
   // Unsupported
   override def interrupt(): Interpreter = ???
 

Reply via email to