Added proper version grabbing for python and R using subprocess calls.
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/15327b58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/15327b58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/15327b58 Branch: refs/heads/master Commit: 15327b589fcd2fcd64c9689a6e4b491e72b643cc Parents: 9ccf178 Author: Marius van Niekerk <[email protected]> Authored: Wed Nov 2 10:21:15 2016 -0400 Committer: Marius van Niekerk <[email protected]> Committed: Wed Nov 2 10:21:15 2016 -0400 ---------------------------------------------------------------------- .../toree/interpreter/broker/BrokerState.scala | 9 +----- .../interpreter/broker/BrokerStateSpec.scala | 6 ++-- .../main/resources/PySpark/pyspark_runner.py | 8 ++++-- .../pyspark/PySparkInterpreter.scala | 29 ++++++++++---------- .../interpreter/sparkr/SparkRInterpreter.scala | 14 +++++++++- .../interpreter/sparkr/SparkRProcess.scala | 4 ++- .../interpreter/sparkr/SparkRService.scala | 3 ++ 7 files changed, 44 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala index 43ee65c..3d1e3ab 100644 --- a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala @@ -38,7 +38,6 @@ class BrokerState(private val maxQueuedCode: Int) { import scala.collection.JavaConverters._ @volatile private var _isReady: Boolean = false - @volatile private var _version: String = _ protected val codeQueue: java.util.Queue[BrokerCode] = new java.util.concurrent.ConcurrentLinkedQueue[BrokerCode]() @@ -128,17 +127,11 @@ class BrokerState(private val maxQueuedCode: Int) { * @param version The language version used by the broker service */ - def markReady(version: String): Unit = { + def markReady(): Unit = { _isReady = true - _version = version } /** - * Retrieve the runtime language version used by the broker service - */ - def getVersion(): String = _version - - /** * Marks the specified code as successfully completed using its id. * * @param codeId The id of the code to mark as a success http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala index 3617816..43374f8 100644 --- a/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala +++ b/kernel-api/src/test/scala/org/apache/toree/interpreter/broker/BrokerStateSpec.scala @@ -78,7 +78,7 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest { describe("#isReady") { it("should return true if the broker state is marked as ready") { - brokerState.markReady("1.0.0") + brokerState.markReady() brokerState.isReady should be (true) } @@ -90,11 +90,11 @@ class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest { describe("#markReady") { it("should mark the state of the broker as ready") { // Mark once to make sure that the state gets set - brokerState.markReady("1.0.0") + brokerState.markReady() brokerState.isReady should be (true) // Mark a second time to ensure that the state does not change - brokerState.markReady("1.0.0") + brokerState.markReady() brokerState.isReady should be (true) } } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py ---------------------------------------------------------------------- diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py index f73805f..e3864f8 100644 --- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py +++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py @@ -15,7 +15,11 @@ # limitations under the License. # -import sys, getopt, traceback, re, ast, platform +import sys +import getopt +import traceback +import re +import ast print("PYTHON::: Starting imports") from py4j.java_gateway import java_import, JavaGateway, GatewayClient @@ -53,7 +57,7 @@ java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") bridge = gateway.entry_point state = bridge.state() -state.markReady(platform.python_version()) +state.markReady() if sparkVersion.startswith("1.2"): java_import(gateway.jvm, "org.apache.spark.sql.SparkSession") http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala ---------------------------------------------------------------------- diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala index a76e1d0..d408217 100644 --- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala +++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala @@ -41,6 +41,7 @@ class PySparkInterpreter( private val WAIT_DURATION: Long = java.util.concurrent.TimeUnit.SECONDS.toMillis(50) private val PythonExecEnv = "PYTHON_EXEC" + private lazy val pythonExecutable = Option(System.getenv(PythonExecEnv)).getOrElse("python") private val logger = LoggerFactory.getLogger(this.getClass) private var _kernel:KernelLike = _ @@ -67,7 +68,7 @@ class PySparkInterpreter( ) private lazy val pySparkService = new PySparkService( - Option(System.getenv(PythonExecEnv)).getOrElse("python"), + pythonExecutable, gatewayServer, pySparkBridge, pySparkProcessHandler @@ -155,18 +156,18 @@ class PySparkInterpreter( override def doQuietly[T](body: => T): T = ??? override def languageInfo: LanguageInfo = { - if ((!pySparkService.isRunning) || (!pySparkState.isReady)) { - LanguageInfo( - "python", - version = "UNKNOWN", - fileExtension = Some(".py"), - pygmentsLexer = Some("python")) - } else { - LanguageInfo( - "python", - pySparkState.getVersion(), - fileExtension = Some(".py"), - pygmentsLexer = Some("python")) - } + import scala.sys.process._ + + // Issue a subprocess call to grab the python version. This is better than polling a child process. + val version = Seq( + pythonExecutable, + "-c", + "import sys; print('{s.major}.{s.minor}.{s.micro}'.format(s=sys.version_info))").!! + + LanguageInfo( + "python", + version = version, + fileExtension = Some(".py"), + pygmentsLexer = Some("python")) } } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala index 975dee1..54bf14a 100644 --- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala @@ -38,6 +38,7 @@ class SparkRInterpreter( ) extends Interpreter { private val logger = LoggerFactory.getLogger(this.getClass) private var _kernel: KernelLike = _ + private val rScriptExecutable = "Rscript" // TODO: Replace hard-coded maximum queue count /** Represents the state used by this interpreter's R instance. */ @@ -61,6 +62,7 @@ class SparkRInterpreter( ) private lazy val sparkRService = new SparkRService( + rScriptExecutable, rBackend, sparkRBridge, sparkRProcessHandler @@ -139,6 +141,16 @@ class SparkRInterpreter( // Unsupported override def doQuietly[T](body: => T): T = ??? - override def languageInfo = LanguageInfo("R", "Unknown", fileExtension = Some(".R"), pygmentsLexer = Some("r")) + override def languageInfo = { + import sys.process._ + + // Issue a subprocess call to grab the R version. This is better than polling a child process. + val version = Seq( + rScriptExecutable, + "-e", + "cat(R.version$major, '.', R.version$minor, sep='', fill=TRUE)").!! + + LanguageInfo("R", version = version, fileExtension = Some(".R"), pygmentsLexer = Some("r")) + } } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala index 0fa453f..d1c145a 100644 --- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ /** * Represents the R process used to evaluate SparkR code. * + * @param processName The name of the Rscript process to run. * @param sparkRBridge The bridge to use to retrieve kernel output streams * and the Spark version to be verified * @param sparkRProcessHandler The handler to use when the process fails or @@ -30,11 +31,12 @@ import scala.collection.JavaConverters._ * back to the JVM */ class SparkRProcess( + processName: String, private val sparkRBridge: SparkRBridge, private val sparkRProcessHandler: SparkRProcessHandler, private val port: Int ) extends BrokerProcess( - processName = "Rscript", + processName = processName, entryResource = "kernelR/sparkr_runner.R", otherResources = Seq("kernelR/sparkr_runner_utils.R"), brokerBridge = sparkRBridge, http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/15327b58/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala index f373ab2..350aee0 100644 --- a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala @@ -29,12 +29,14 @@ import scala.tools.nsc.interpreter._ * Represents the service that provides the high-level interface between the * JVM and R. * + * @param processName The name of the Rscript process to run. * @param rBackend The backend to start to communicate between the JVM and R * @param sparkRBridge The bridge to use for communication between the JVM and R * @param sparkRProcessHandler The handler used for events that occur with the * SparkR process */ class SparkRService( + processName: String, private val rBackend: ReflectiveRBackend, private val sparkRBridge: SparkRBridge, private val sparkRProcessHandler: SparkRProcessHandler @@ -47,6 +49,7 @@ class SparkRService( /** Represents the process used to execute R code via the bridge. */ private lazy val sparkRProcess: SparkRProcess = { val p = new SparkRProcess( + processName, sparkRBridge, sparkRProcessHandler, rBackendPort
