Fixed sqlContext not appearing
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/73cc589d Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/73cc589d Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/73cc589d Branch: refs/heads/master Commit: 73cc589d412277a1fd1c79e70945bc1d2cccd33d Parents: 3905e47 Author: Chip Senkbeil <pair+rcsen...@us.ibm.com> Authored: Tue Dec 8 11:47:18 2015 -0600 Committer: Chip Senkbeil <pair+rcsen...@us.ibm.com> Committed: Tue Dec 8 11:49:02 2015 -0600 ---------------------------------------------------------------------- .../com/ibm/spark/interpreter/Interpreter.scala | 15 ++++++- .../boot/layer/ComponentInitialization.scala | 45 -------------------- .../scala/com/ibm/spark/kernel/api/Kernel.scala | 45 ++++++++++++++++++-- .../com/ibm/spark/kernel/api/KernelSpec.scala | 1 - .../scala/test/utils/DummyInterpreter.scala | 16 +++++++ .../pyspark/PySparkInterpreter.scala | 8 ++-- .../interpreter/scala/ScalaInterpreter.scala | 41 +++++++++++++----- .../interpreter/sparkr/SparkRInterpreter.scala | 5 +++ .../kernel/interpreter/sql/SqlInterpreter.scala | 4 ++ 9 files changed, 116 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala index 76d4432..6200b9b 100644 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala +++ b/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala @@ -20,6 +20,7 @@ import java.net.URL import com.ibm.spark.kernel.api.KernelLike import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext import scala.tools.nsc.interpreter._ @@ -79,7 +80,19 @@ trait Interpreter { */ def doQuietly[T](body: => T): T - def bindSparkContext(sparkContext: SparkContext): Unit = ??? + /** + * Binds the SparkContext instance to the interpreter's namespace. + * + * @param sparkContext The SparkContext to bind + */ + def bindSparkContext(sparkContext: SparkContext): Unit + + /** + * Binds the SQLContext instance to the interpreter's namespace. + * + * @param sqlContext The SQLContext to bind + */ + def bindSqlContext(sqlContext: SQLContext): Unit /** * Binds a variable in the interpreter to a value. http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala index 973075b..939b896 100644 --- a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala +++ b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala @@ -128,51 +128,6 @@ trait StandardComponentInitialization extends ComponentInitialization { dependencyDownloader } - protected[layer] def initializeSqlContext(sparkContext: SparkContext) = { - val sqlContext: SQLContext = try { - logger.info("Attempting to create Hive Context") - val hiveContextClassString = - "org.apache.spark.sql.hive.HiveContext" - - logger.debug(s"Looking up $hiveContextClassString") - val hiveContextClass = Class.forName(hiveContextClassString) - - val sparkContextClass = classOf[SparkContext] - val sparkContextClassName = sparkContextClass.getName - - logger.debug(s"Searching for constructor taking $sparkContextClassName") - val hiveContextContructor = - hiveContextClass.getConstructor(sparkContextClass) - - logger.debug("Invoking Hive Context constructor") - hiveContextContructor.newInstance(sparkContext).asInstanceOf[SQLContext] - } catch { - case _: Throwable => - logger.warn("Unable to create Hive Context! Defaulting to SQL Context!") - new SQLContext(sparkContext) - } - - sqlContext - } - - protected[layer] def updateInterpreterWithSqlContext( - sqlContext: SQLContext, interpreter: Interpreter - ): Unit = { - interpreter.doQuietly { - // TODO: This only adds the context to the main interpreter AND - // is limited to the Scala interpreter interface - logger.debug("Adding SQL Context to main interpreter") - interpreter.bind( - "sqlContext", - classOf[SQLContext].getName, - sqlContext, - List( """@transient""") - ) - - sqlContext - } - } - protected def initializeResponseMap(): collection.mutable.Map[String, ActorRef] = new ConcurrentHashMap[String, ActorRef]().asScala http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala index 3ff4f85..219804a 100644 --- a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala +++ b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala @@ -329,11 +329,13 @@ class Kernel ( _sparkConf = createSparkConf(conf) _sparkContext = initializeSparkContext(sparkConf) _javaSparkContext = new JavaSparkContext(_sparkContext) - _sqlContext = new SQLContext(_sparkContext) + _sqlContext = initializeSqlContext(_sparkContext) - logger.info( s"Connecting to spark.master ${_sparkConf.getOption("spark.master").getOrElse("not_set")}") + val sparkMaster = _sparkConf.getOption("spark.master").getOrElse("not_set") + logger.info( s"Connecting to spark.master $sparkMaster") - updateInterpreterWithSparkContext(sparkContext) + updateInterpreterWithSparkContext(interpreter, sparkContext) + updateInterpreterWithSqlContext(interpreter, sqlContext) magicLoader.dependencyMap = magicLoader.dependencyMap.setSparkContext(_sparkContext) @@ -394,12 +396,47 @@ class Kernel ( // TODO: Think of a better way to test without exposing this protected[kernel] def updateInterpreterWithSparkContext( - sparkContext: SparkContext + interpreter: Interpreter, sparkContext: SparkContext ) = { interpreter.bindSparkContext(sparkContext) } + protected[kernel] def initializeSqlContext( + sparkContext: SparkContext + ): SQLContext = { + val sqlContext: SQLContext = try { + logger.info("Attempting to create Hive Context") + val hiveContextClassString = + "org.apache.spark.sql.hive.HiveContext" + + logger.debug(s"Looking up $hiveContextClassString") + val hiveContextClass = Class.forName(hiveContextClassString) + + val sparkContextClass = classOf[SparkContext] + val sparkContextClassName = sparkContextClass.getName + + logger.debug(s"Searching for constructor taking $sparkContextClassName") + val hiveContextContructor = + hiveContextClass.getConstructor(sparkContextClass) + + logger.debug("Invoking Hive Context constructor") + hiveContextContructor.newInstance(sparkContext).asInstanceOf[SQLContext] + } catch { + case _: Throwable => + logger.warn("Unable to create Hive Context! Defaulting to SQL Context!") + new SQLContext(sparkContext) + } + + sqlContext + } + + protected[kernel] def updateInterpreterWithSqlContext( + interpreter: Interpreter, sqlContext: SQLContext + ): Unit = { + interpreter.bindSqlContext(sqlContext) + } + override def interpreter(name: String): Option[Interpreter] = { interpreterManager.interpreters.get(name) } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala index 98157a9..58ea0c5 100644 --- a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala +++ b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala @@ -10,7 +10,6 @@ import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader import com.ibm.spark.magic.MagicLoader import com.typesafe.config.Config import org.apache.spark.{SparkConf, SparkContext} -import org.mockito.ArgumentCaptor import org.mockito.Mockito._ import org.mockito.Matchers._ import org.scalatest.mock.MockitoSugar http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/kernel/src/test/scala/test/utils/DummyInterpreter.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/test/utils/DummyInterpreter.scala b/kernel/src/test/scala/test/utils/DummyInterpreter.scala index ee7c096..aec9635 100644 --- a/kernel/src/test/scala/test/utils/DummyInterpreter.scala +++ b/kernel/src/test/scala/test/utils/DummyInterpreter.scala @@ -5,6 +5,8 @@ import java.net.URL import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter} import com.ibm.spark.interpreter.Results.Result import com.ibm.spark.kernel.api.KernelLike +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext import scala.tools.nsc.interpreter.{OutputStream, InputStream} @@ -109,4 +111,18 @@ class DummyInterpreter(kernel: KernelLike) extends Interpreter { * @return The newly initialized interpreter */ override def init(kernel: KernelLike): Interpreter = ??? + + /** + * Binds the SparkContext instance to the interpreter's namespace. + * + * @param sparkContext The SparkContext to bind + */ + override def bindSparkContext(sparkContext: SparkContext): Unit = ??? + + /** + * Binds the SQLContext instance to the interpreter's namespace. + * + * @param sqlContext The SQLContext to bind + */ + override def bindSqlContext(sqlContext: SQLContext): Unit = ??? } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala ---------------------------------------------------------------------- diff --git a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala index 38e1d68..615ed19 100644 --- a/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala +++ b/pyspark-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/pyspark/PySparkInterpreter.scala @@ -21,6 +21,7 @@ import com.ibm.spark.interpreter.Results.Result import com.ibm.spark.interpreter._ import com.ibm.spark.kernel.api.KernelLike import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext import org.slf4j.LoggerFactory import py4j.GatewayServer @@ -78,10 +79,11 @@ class PySparkInterpreter( this } + // Unsupported (but can be invoked) + override def bindSparkContext(sparkContext: SparkContext): Unit = {} - override def bindSparkContext(sparkContext: SparkContext) = { - - } + // Unsupported (but can be invoked) + override def bindSqlContext(sqlContext: SQLContext): Unit = {} /** * Executes the provided code with the option to silence output. http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala index 078054a..ca9bd7a 100644 --- a/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala +++ b/scala-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/scala/ScalaInterpreter.scala @@ -31,6 +31,7 @@ import com.ibm.spark.kernel.api.{KernelLike, KernelOptions} import com.ibm.spark.utils.{MultiOutputStream, TaskManager} import org.apache.spark.SparkContext import org.apache.spark.repl.{SparkCommandLine, SparkIMain, SparkJLineCompletion} +import org.apache.spark.sql.SQLContext import org.slf4j.LoggerFactory import scala.annotation.tailrec @@ -514,12 +515,16 @@ class ScalaInterpreter() extends Interpreter { } override def bindSparkContext(sparkContext: SparkContext) = { + val bindName = "sc" doQuietly { - logger.debug("Binding context into interpreter") + logger.debug(s"Binding SparkContext into interpreter as $bindName") bind( - "sc", "org.apache.spark.SparkContext", - sparkContext, List( """@transient""")) + bindName, + "org.apache.spark.SparkContext", + sparkContext, + List( """@transient""") + ) // NOTE: This is needed because interpreter blows up after adding // dependencies to SparkContext and Interpreter before the @@ -530,16 +535,32 @@ class ScalaInterpreter() extends Interpreter { logger.debug("Initializing Spark cluster in interpreter") doQuietly { - interpret(""" - | val $toBeNulled = { - | var $toBeNulled = sc.emptyRDD.collect() - | $toBeNulled = null - | } - | - |""".stripMargin) + interpret(Seq( + "val $toBeNulled = {", + " var $toBeNulled = sc.emptyRDD.collect()", + " $toBeNulled = null", + "}" + ).mkString("\n").trim()) } } + } + + override def bindSqlContext(sqlContext: SQLContext): Unit = { + val bindName = "sqlContext" + + doQuietly { + // TODO: This only adds the context to the main interpreter AND + // is limited to the Scala interpreter interface + logger.debug(s"Binding SQLContext into interpreter as $bindName") + bind( + bindName, + classOf[SQLContext].getName, + sqlContext, + List( """@transient""") + ) + sqlContext + } } override def bind( http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala index a950da0..45fe03c 100644 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala +++ b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala @@ -21,6 +21,7 @@ import com.ibm.spark.interpreter.Results.Result import com.ibm.spark.interpreter._ import com.ibm.spark.kernel.api.KernelLike import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext import org.slf4j.LoggerFactory import scala.concurrent.Await @@ -133,8 +134,12 @@ class SparkRInterpreter( // Unsupported override def classServerURI: String = "" + // Unsupported (but can be invoked) override def bindSparkContext(sparkContext: SparkContext): Unit = {} + // Unsupported (but can be invoked) + override def bindSqlContext(sqlContext: SQLContext): Unit = {} + // Unsupported override def interrupt(): Interpreter = ??? http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/73cc589d/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala index 837d917..889d4a6 100644 --- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala +++ b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala @@ -102,8 +102,12 @@ class SqlInterpreter() extends Interpreter { // Unsupported override def classServerURI: String = "" + // Unsupported (but can be invoked) override def bindSparkContext(sparkContext: SparkContext): Unit = {} + // Unsupported (but can be invoked) + override def bindSqlContext(sqlContext: SQLContext): Unit = {} + // Unsupported override def interrupt(): Interpreter = ???