Passing version to broker on 'markReady'
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/b3ef81bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/b3ef81bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/b3ef81bb Branch: refs/heads/master Commit: b3ef81bb04d35d92469a7b285bb55971f59dff8a Parents: 6a77283 Author: Liam Fisk <[email protected]> Authored: Fri Jul 1 09:45:38 2016 +1200 Committer: Marius van Niekerk <[email protected]> Committed: Thu Oct 20 17:50:59 2016 -0400 ---------------------------------------------------------------------- .../org/apache/toree/interpreter/Interpreter.scala | 2 +- .../toree/interpreter/broker/BrokerState.scala | 17 ++++++++++++++--- .../toree/interpreter/broker/BrokerStateSpec.scala | 6 +++--- .../v5/handler/KernelInfoRequestHandlerSpec.scala | 2 +- .../toree/kernel/protocol/v5/LanguageInfo.scala | 2 +- .../protocol/v5/content/KernelInfoReplySpec.scala | 2 +- .../src/main/resources/PySpark/pyspark_runner.py | 4 ++-- .../interpreter/pyspark/PySparkInterpreter.scala | 1 + .../interpreter/scala/ScalaInterpreter.scala | 2 +- .../interpreter/sparkr/SparkRInterpreter.scala | 4 ++-- .../kernel/interpreter/sql/SqlInterpreter.scala | 2 +- 11 files changed, 28 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala index edf1e71..bfff81c 100644 --- a/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala @@ -25,7 +25,7 @@ import scala.tools.nsc.interpreter._ case class LanguageInfo( name: String, - version: Option[String] = None, + version: String, fileExtension: Option[String] = None, pygmentsLexer: Option[String] = None) { } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala index 4595a92..43ee65c 100644 --- a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala @@ -37,7 +37,9 @@ class BrokerState(private val maxQueuedCode: Int) { import scala.collection.JavaConverters._ - private var _isReady: Boolean = false + @volatile private var _isReady: Boolean = false + @volatile private var _version: String = _ + protected val codeQueue: java.util.Queue[BrokerCode] = new java.util.concurrent.ConcurrentLinkedQueue[BrokerCode]() protected val promiseMap: collection.mutable.Map[CodeId, BrokerPromise] = @@ -123,8 +125,18 @@ class BrokerState(private val maxQueuedCode: Int) { /** * Marks the state of broker as ready. + + * @param version The language version used by the broker service */ - def markReady(): Unit = _isReady = true + def markReady(version: String): Unit = { + _isReady = true + _version = version + } + + /** + * Retrieve the runtime language version used by the broker service + */ + def getVersion(): String = _version /** * Marks the specified code as successfully completed using its id. @@ -192,4 +204,3 @@ class BrokerState(private val maxQueuedCode: Int) { } } } - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala index 43374f8..3617816 100644 --- a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala +++ b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala @@ -78,7 +78,7 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest { describe("#isReady") { it("should return true if the broker state is marked as ready") { - brokerState.markReady() + brokerState.markReady("1.0.0") brokerState.isReady should be (true) } @@ -90,11 +90,11 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest { describe("#markReady") { it("should mark the state of the broker as ready") { // Mark once to make sure that the state gets set - brokerState.markReady() + brokerState.markReady("1.0.0") brokerState.isReady should be (true) // Mark a second time to ensure that the state does not change - brokerState.markReady() + brokerState.markReady("1.0.0") brokerState.isReady should be (true) } } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala index 252f64a..73aa2a3 100644 --- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala +++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala @@ -42,7 +42,7 @@ class KernelInfoRequestHandlerSpec extends TestKit( ConfigFactory.parseString(KernelInfoRequestHandlerSpec.config)) ) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar { val actorLoader: ActorLoader = mock[ActorLoader] - val actor = system.actorOf(Props(classOf[KernelInfoRequestHandler], actorLoader, LanguageInfo("test", Some("1.0.0"), Some(".test")))) + val actor = system.actorOf(Props(classOf[KernelInfoRequestHandler], actorLoader, LanguageInfo("test", "1.0.0", Some(".test")))) val relayProbe : TestProbe = TestProbe() val relaySelection : ActorSelection = http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/LanguageInfo.scala ---------------------------------------------------------------------- diff --git a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/LanguageInfo.scala b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/LanguageInfo.scala index d2d99be..161181e 100644 --- a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/LanguageInfo.scala +++ b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/LanguageInfo.scala @@ -19,7 +19,7 @@ package org.apache.toree.kernel.protocol.v5 case class LanguageInfo( name: String, - version: Option[String] = None, + version: String, file_extension: Option[String] = None, pygments_lexer: Option[String] = None) { } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala ---------------------------------------------------------------------- diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala index 68c901c..f138725 100644 --- a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala +++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/content/KernelInfoReplySpec.scala @@ -34,7 +34,7 @@ class KernelInfoReplySpec extends FunSpec with Matchers { """) val kernelInfoReply: KernelInfoReply = KernelInfoReply( - "x.y.z", "<name>", "z.y.x", LanguageInfo("<some language>", Some("a.b.c"), Some("<some extension>")), "<some banner>" + "x.y.z", "<name>", "z.y.x", LanguageInfo("<some language>", "a.b.c", Some("<some extension>")), "<some banner>" ) describe("KernelInfoReply") { http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py ---------------------------------------------------------------------- diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py index 5073a4d..f73805f 100644 --- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py +++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py @@ -15,7 +15,7 @@ # limitations under the License. # -import sys, getopt, traceback, re, ast +import sys, getopt, traceback, re, ast, platform print("PYTHON::: Starting imports") from py4j.java_gateway import java_import, JavaGateway, GatewayClient @@ -53,7 +53,7 @@ java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") bridge = gateway.entry_point state = bridge.state() -state.markReady() +state.markReady(platform.python_version()) if sparkVersion.startswith("1.2"): java_import(gateway.jvm, "org.apache.spark.sql.SparkSession") http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala ---------------------------------------------------------------------- diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala index e347ed8..632f52d 100644 --- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala +++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala @@ -151,6 +151,7 @@ class PySparkInterpreter( // TODO Identify how to plumb python version to here override def languageInfo = LanguageInfo( "python", + pySparkState.getVersion(), fileExtension = Some(".py"), pygmentsLexer = Some("ipython2")) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala b/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala index 464c223..4c04436 100644 --- a/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala +++ b/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala @@ -338,6 +338,6 @@ object ScalaInterpreter { } - override def languageInfo = LanguageInfo("scala", Some(BuildInfo.scalaVersion), Some(".scala")) + override def languageInfo = LanguageInfo("scala", BuildInfo.scalaVersion, fileExtension = Some(".scala")) } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala index c067d5b..d6be29b 100644 --- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala @@ -138,7 +138,7 @@ class SparkRInterpreter( // Unsupported override def doQuietly[T](body: => T): T = ??? - - override def languageInfo = LanguageInfo("R") + + override def languageInfo = LanguageInfo("scala", BuildInfo.scalaVersion, fileExtension = Some(".scala")) } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/b3ef81bb/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala index fa7fc2b..b6e272f 100644 --- a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala +++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala @@ -107,6 +107,6 @@ class SqlInterpreter() extends Interpreter { // Unsupported override def doQuietly[T](body: => T): T = ??? - override def languageInfo = LanguageInfo("SQL") + override def languageInfo = LanguageInfo("scala", BuildInfo.scalaVersion, fileExtension = Some(".scala")) }
