http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala deleted file mode 100644 index 81cb86e..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/ReflectiveRBackend.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -/** - * Provides reflective access into the backend R component that is not - * publically accessible. - */ -class ReflectiveRBackend { - private val rBackendClass = Class.forName("org.apache.spark.api.r.RBackend") - private val rBackendInstance = rBackendClass.newInstance() - - /** - * Initializes the underlying RBackend service. - * - * @return The port used by the service - */ - def init(): Int = { - val runMethod = rBackendClass.getDeclaredMethod("init") - - runMethod.invoke(rBackendInstance).asInstanceOf[Int] - } - - /** Blocks until the service has finished. */ - def run(): Unit = { - val runMethod = rBackendClass.getDeclaredMethod("run") - - runMethod.invoke(rBackendInstance) - } - - /** Closes the underlying RBackend service. */ - def close(): Unit = { - val runMethod = rBackendClass.getDeclaredMethod("close") - - runMethod.invoke(rBackendInstance) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala deleted file mode 100644 index 44fa203..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRBridge.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, JavaSparkContextProducerLike, SQLContextProducerLike} -import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge} -import com.ibm.spark.kernel.api.KernelLike -import org.apache.spark.SparkContext - -/** - * Represents constants for the SparkR bridge. - */ -object SparkRBridge { - /** Represents the maximum amount of code that can be queued for Python. */ - val MaxQueuedCode = 500 - - /** Contains the bridge used by the current R process. */ - @volatile private var _sparkRBridge: Option[SparkRBridge] = None - - /** Allows kernel to set bridge dynamically. */ - private[sparkr] def sparkRBridge_=(newSparkRBridge: SparkRBridge): Unit = { - _sparkRBridge = Some(newSparkRBridge) - } - - /** Clears the bridge currently hosted statically. */ - private[sparkr] def reset(): Unit = _sparkRBridge = None - - /** Must be exposed in a static location for RBackend to access. */ - def sparkRBridge: SparkRBridge = { - assert(_sparkRBridge.nonEmpty, "SparkRBridge has not been initialized!") - _sparkRBridge.get - } - - /** - * Creates a new SparkRBridge instance. - * - * @param brokerState The container of broker state to expose - * @param kernel The kernel API to expose through the bridge - * - * @return The new SparkR bridge - */ - def apply( - brokerState: BrokerState, - kernel: KernelLike - ): SparkRBridge = { - new SparkRBridge( - _brokerState = brokerState, - _kernel = kernel - ) with StandardJavaSparkContextProducer with StandardSQLContextProducer - } -} - -/** - * Represents the API available to SparkR to act as the bridge for data - * between the JVM and R. - * - * @param _brokerState The container of broker state to expose - * @param _kernel The kernel API to expose through the bridge - */ -class SparkRBridge private ( - private val _brokerState: BrokerState, - private val _kernel: KernelLike -) extends BrokerBridge(_brokerState, _kernel) { - override val brokerName: String = "SparkR" -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala deleted file mode 100644 index 0be8f61..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRException.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import com.ibm.spark.interpreter.broker.BrokerException - -/** - * Represents a generic SparkR exception. - * - * @param message The message to associate with the exception - */ -class SparkRException(message: String) extends BrokerException(message) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala deleted file mode 100644 index 45fe03c..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRInterpreter.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import java.net.URL - -import com.ibm.spark.interpreter.Results.Result -import com.ibm.spark.interpreter._ -import com.ibm.spark.kernel.api.KernelLike -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.slf4j.LoggerFactory - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.tools.nsc.interpreter.{InputStream, OutputStream} - -/** - * Represents an interpreter interface to SparkR. Requires a properly-set - * SPARK_HOME pointing to a binary distribution (needs packaged SparkR library) - * and an implementation of R on the path. - * - */ -class SparkRInterpreter( -) extends Interpreter { - private val logger = LoggerFactory.getLogger(this.getClass) - private var _kernel: KernelLike = _ - - // TODO: Replace hard-coded maximum queue count - /** Represents the state used by this interpreter's R instance. */ - private lazy val sparkRState = new SparkRState(500) - - /** Represents the bridge used by this interpreter's R instance. */ - private lazy val sparkRBridge = SparkRBridge( - sparkRState, - _kernel - ) - - /** Represents the interface for R to talk to JVM Spark components. */ - private lazy val rBackend = new ReflectiveRBackend - - /** Represents the process handler used for the SparkR process. */ - private lazy val sparkRProcessHandler: SparkRProcessHandler = - new SparkRProcessHandler( - sparkRBridge, - restartOnFailure = true, - restartOnCompletion = true - ) - - private lazy val sparkRService = new SparkRService( - rBackend, - sparkRBridge, - sparkRProcessHandler - ) - private lazy val sparkRTransformer = new SparkRTransformer - - override def init(kernel: KernelLike): Interpreter = { - _kernel = kernel - this - } - - /** - * Executes the provided code with the option to silence output. - * @param code The code to execute - * @param silent Whether or not to execute the code silently (no output) - * @return The success/failure of the interpretation and the output from the - * execution or the failure - */ - override def interpret(code: String, silent: Boolean): - (Result, Either[ExecuteOutput, ExecuteFailure]) = - { - if (!sparkRService.isRunning) sparkRService.start() - - val futureResult = sparkRTransformer.transformToInterpreterResult( - sparkRService.submitCode(code) - ) - - Await.result(futureResult, Duration.Inf) - } - - /** - * Starts the interpreter, initializing any internal state. - * @return A reference to the interpreter - */ - override def start(): Interpreter = { - sparkRService.start() - - this - } - - /** - * Stops the interpreter, removing any previous internal state. - * @return A reference to the interpreter - */ - override def stop(): Interpreter = { - sparkRService.stop() - - this - } - - /** - * Returns the class loader used by this interpreter. - * - * @return The runtime class loader used by this interpreter - */ - override def classLoader: ClassLoader = this.getClass.getClassLoader - - // Unsupported (but can be invoked) - override def lastExecutionVariableName: Option[String] = None - - // Unsupported (but can be invoked) - override def read(variableName: String): Option[AnyRef] = None - - // Unsupported (but can be invoked) - override def completion(code: String, pos: Int): (Int, List[String]) = - (pos, Nil) - - // Unsupported - override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ??? - - // Unsupported - override def classServerURI: String = "" - - // Unsupported (but can be invoked) - override def bindSparkContext(sparkContext: SparkContext): Unit = {} - - // Unsupported (but can be invoked) - override def bindSqlContext(sqlContext: SQLContext): Unit = {} - - // Unsupported - override def interrupt(): Interpreter = ??? - - // Unsupported - override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ??? - - // Unsupported - override def addJars(jars: URL*): Unit = ??? - - // Unsupported - override def doQuietly[T](body: => T): T = ??? -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala deleted file mode 100644 index 2429dc4..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcess.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import com.ibm.spark.interpreter.broker.BrokerProcess -import scala.collection.JavaConverters._ - -/** - * Represents the R process used to evaluate SparkR code. - * - * @param sparkRBridge The bridge to use to retrieve kernel output streams - * and the Spark version to be verified - * @param sparkRProcessHandler The handler to use when the process fails or - * completes - * @param port The port to provide to the SparkR process to use to connect - * back to the JVM - */ -class SparkRProcess( - private val sparkRBridge: SparkRBridge, - private val sparkRProcessHandler: SparkRProcessHandler, - private val port: Int -) extends BrokerProcess( - processName = "Rscript", - entryResource = "kernelR/sparkr_runner.R", - otherResources = Seq("kernelR/sparkr_runner_utils.R", "sparkr_bundle.tar.gz"), - brokerBridge = sparkRBridge, - brokerProcessHandler = sparkRProcessHandler, - arguments = Seq( - "--default-packages=datasets,utils,grDevices,graphics,stats,methods" - ) -) { - override val brokerName: String = "SparkR" - private val sparkHome = Option(System.getenv("SPARK_HOME")) - .orElse(Option(System.getProperty("spark.home"))) - - assert(sparkHome.nonEmpty, "SparkR process requires Spark Home to be set!") - - /** - * Creates a new process environment to be used for environment variable - * retrieval by the new process. - * - * @return The map of environment variables and their respective values - */ - override protected def newProcessEnvironment(): Map[String, String] = { - val baseEnvironment = super.newProcessEnvironment() - - // Note: Adding the new map values should override the old ones - baseEnvironment ++ Map( - "SPARK_HOME" -> sparkHome.get, - "EXISTING_SPARKR_BACKEND_PORT" -> port.toString - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala deleted file mode 100644 index d33d265..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRProcessHandler.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import com.ibm.spark.interpreter.broker.BrokerProcessHandler - -/** - * Represents the handler for events triggered by the SparkR process. - * - * @param sparkRBridge The bridge to reset when the process fails or completes - * @param restartOnFailure If true, restarts the process if it fails - * @param restartOnCompletion If true, restarts the process if it completes - */ -class SparkRProcessHandler( - private val sparkRBridge: SparkRBridge, - private val restartOnFailure: Boolean, - private val restartOnCompletion: Boolean -) extends BrokerProcessHandler( - sparkRBridge, - restartOnFailure, - restartOnCompletion -) { - override val brokerName: String = "SparkR" -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala deleted file mode 100644 index 71731a8..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRService.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import java.util.concurrent.{TimeUnit, Semaphore} - -import com.ibm.spark.interpreter.broker.BrokerService -import com.ibm.spark.kernel.api.KernelLike -import com.ibm.spark.kernel.interpreter.sparkr.SparkRTypes.{Code, CodeResults} -import org.apache.spark.SparkContext -import org.slf4j.LoggerFactory - -import scala.concurrent.{future, Future} - -/** - * Represents the service that provides the high-level interface between the - * JVM and R. - * - * @param rBackend The backend to start to communicate between the JVM and R - * @param sparkRBridge The bridge to use for communication between the JVM and R - * @param sparkRProcessHandler The handler used for events that occur with the - * SparkR process - */ -class SparkRService( - private val rBackend: ReflectiveRBackend, - private val sparkRBridge: SparkRBridge, - private val sparkRProcessHandler: SparkRProcessHandler -) extends BrokerService { - private val logger = LoggerFactory.getLogger(this.getClass) - @volatile private var rBackendPort: Int = -1 - @volatile private var _isRunning: Boolean = false - override def isRunning: Boolean = _isRunning - - /** Represents the process used to execute R code via the bridge. */ - private lazy val sparkRProcess: SparkRProcess = { - val p = new SparkRProcess( - sparkRBridge, - sparkRProcessHandler, - rBackendPort - ) - - // Update handlers to correctly reset and restart the process - sparkRProcessHandler.setResetMethod(message => { - p.stop() - sparkRBridge.state.reset(message) - }) - sparkRProcessHandler.setRestartMethod(() => p.start()) - - p - } - - /** Starts the SparkR service. */ - override def start(): Unit = { - logger.debug("Initializing statically-accessible SparkR bridge") - SparkRBridge.sparkRBridge = sparkRBridge - - val initialized = new Semaphore(0) - import scala.concurrent.ExecutionContext.Implicits.global - val rBackendRun = future { - logger.debug("Initializing RBackend") - rBackendPort = rBackend.init() - logger.debug(s"RBackend running on port $rBackendPort") - initialized.release() - logger.debug("Running RBackend") - rBackend.run() - logger.debug("RBackend has finished") - } - - // Wait for backend to start before starting R process to connect - val backendTimeout = - sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt - if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) { - // Start the R process used to execute code - logger.debug("Launching process to execute R code") - sparkRProcess.start() - _isRunning = true - } else { - // Unable to initialize, so throw an exception - throw new SparkRException( - s"Unable to initialize R backend in $backendTimeout seconds!") - } - } - - /** - * Submits code to the SparkR service to be executed and return a result. - * - * @param code The code to execute - * - * @return The result as a future to eventually return - */ - override def submitCode(code: Code): Future[CodeResults] = { - sparkRBridge.state.pushCode(code) - } - - /** Stops the running SparkR service. */ - override def stop(): Unit = { - // Stop the R process used to execute code - sparkRProcess.stop() - - // Stop the server used as an entrypoint for R - rBackend.close() - - // Clear the bridge - SparkRBridge.reset() - - _isRunning = false - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala deleted file mode 100644 index 60f67a3..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRState.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import com.ibm.spark.interpreter.broker.BrokerState - -/** - * Represents the state structure of SparkR. - * - * @param maxQueuedCode The maximum amount of code to support being queued - * at the same time for SparkR execution - */ -class SparkRState(private val maxQueuedCode: Int) - extends BrokerState(maxQueuedCode) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala deleted file mode 100644 index 45c44c0..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTransformer.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import com.ibm.spark.interpreter.broker.BrokerTransformer - -/** - * Represents the transformer used by SparkR. - */ -class SparkRTransformer extends BrokerTransformer http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala deleted file mode 100644 index 11f33c0..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/SparkRTypes.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sparkr - -import com.ibm.spark.interpreter.broker.BrokerTypesProvider - -/** - * Represents all types associated with the SparkR interface. - */ -object SparkRTypes extends BrokerTypesProvider http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala deleted file mode 100644 index 872d376..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sparkr/package.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter - -import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise} - -/** - * Contains aliases to broker types. - */ -package object sparkr { - /** - * Represents a promise made regarding the completion of SparkR code - * execution. - */ - type SparkRPromise = BrokerPromise - - /** - * Represents a block of SparkR code to be evaluated. - */ - type SparkRCode = BrokerCode -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala b/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala deleted file mode 100644 index 7eba136..0000000 --- a/sparkr-interpreter/src/main/scala/com/ibm/spark/magic/builtin/SparkR.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.magic.builtin - -import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted} -import com.ibm.spark.kernel.interpreter.sparkr.{SparkRInterpreter, SparkRException} -import com.ibm.spark.kernel.protocol.v5.MIMEType -import com.ibm.spark.magic.{CellMagicOutput, CellMagic} -import com.ibm.spark.magic.dependencies.IncludeKernel - -/** - * Represents the magic interface to use the SparkR interpreter. - */ -class SparkR extends CellMagic with IncludeKernel { - override def execute(code: String): CellMagicOutput = { - val sparkR = kernel.interpreter("SparkR") - - if (sparkR.isEmpty || sparkR.get == null) - throw new SparkRException("SparkR is not available!") - - sparkR.get match { - case sparkRInterpreter: SparkRInterpreter => - val (_, output) = sparkRInterpreter.interpret(code) - output match { - case Left(executeOutput) => - CellMagicOutput(MIMEType.PlainText -> executeOutput) - case Right(executeFailure) => executeFailure match { - case executeAborted: ExecuteAborted => - throw new SparkRException("SparkR code was aborted!") - case executeError: ExecuteError => - throw new SparkRException(executeError.value) - } - } - case otherInterpreter => - val className = otherInterpreter.getClass.getName - throw new SparkRException(s"Invalid SparkR interpreter: $className") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala new file mode 100644 index 0000000..81cb86e --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/ReflectiveRBackend.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +/** + * Provides reflective access into the backend R component that is not + * publically accessible. + */ +class ReflectiveRBackend { + private val rBackendClass = Class.forName("org.apache.spark.api.r.RBackend") + private val rBackendInstance = rBackendClass.newInstance() + + /** + * Initializes the underlying RBackend service. + * + * @return The port used by the service + */ + def init(): Int = { + val runMethod = rBackendClass.getDeclaredMethod("init") + + runMethod.invoke(rBackendInstance).asInstanceOf[Int] + } + + /** Blocks until the service has finished. */ + def run(): Unit = { + val runMethod = rBackendClass.getDeclaredMethod("run") + + runMethod.invoke(rBackendInstance) + } + + /** Closes the underlying RBackend service. */ + def close(): Unit = { + val runMethod = rBackendClass.getDeclaredMethod("close") + + runMethod.invoke(rBackendInstance) + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala new file mode 100644 index 0000000..44fa203 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRBridge.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import com.ibm.spark.interpreter.broker.producer.{StandardSQLContextProducer, StandardJavaSparkContextProducer, JavaSparkContextProducerLike, SQLContextProducerLike} +import com.ibm.spark.interpreter.broker.{BrokerState, BrokerBridge} +import com.ibm.spark.kernel.api.KernelLike +import org.apache.spark.SparkContext + +/** + * Represents constants for the SparkR bridge. + */ +object SparkRBridge { + /** Represents the maximum amount of code that can be queued for Python. */ + val MaxQueuedCode = 500 + + /** Contains the bridge used by the current R process. */ + @volatile private var _sparkRBridge: Option[SparkRBridge] = None + + /** Allows kernel to set bridge dynamically. */ + private[sparkr] def sparkRBridge_=(newSparkRBridge: SparkRBridge): Unit = { + _sparkRBridge = Some(newSparkRBridge) + } + + /** Clears the bridge currently hosted statically. */ + private[sparkr] def reset(): Unit = _sparkRBridge = None + + /** Must be exposed in a static location for RBackend to access. */ + def sparkRBridge: SparkRBridge = { + assert(_sparkRBridge.nonEmpty, "SparkRBridge has not been initialized!") + _sparkRBridge.get + } + + /** + * Creates a new SparkRBridge instance. + * + * @param brokerState The container of broker state to expose + * @param kernel The kernel API to expose through the bridge + * + * @return The new SparkR bridge + */ + def apply( + brokerState: BrokerState, + kernel: KernelLike + ): SparkRBridge = { + new SparkRBridge( + _brokerState = brokerState, + _kernel = kernel + ) with StandardJavaSparkContextProducer with StandardSQLContextProducer + } +} + +/** + * Represents the API available to SparkR to act as the bridge for data + * between the JVM and R. + * + * @param _brokerState The container of broker state to expose + * @param _kernel The kernel API to expose through the bridge + */ +class SparkRBridge private ( + private val _brokerState: BrokerState, + private val _kernel: KernelLike +) extends BrokerBridge(_brokerState, _kernel) { + override val brokerName: String = "SparkR" +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala new file mode 100644 index 0000000..0be8f61 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRException.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import com.ibm.spark.interpreter.broker.BrokerException + +/** + * Represents a generic SparkR exception. + * + * @param message The message to associate with the exception + */ +class SparkRException(message: String) extends BrokerException(message) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala new file mode 100644 index 0000000..45fe03c --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRInterpreter.scala @@ -0,0 +1,154 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import java.net.URL + +import com.ibm.spark.interpreter.Results.Result +import com.ibm.spark.interpreter._ +import com.ibm.spark.kernel.api.KernelLike +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.slf4j.LoggerFactory + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.tools.nsc.interpreter.{InputStream, OutputStream} + +/** + * Represents an interpreter interface to SparkR. Requires a properly-set + * SPARK_HOME pointing to a binary distribution (needs packaged SparkR library) + * and an implementation of R on the path. + * + */ +class SparkRInterpreter( +) extends Interpreter { + private val logger = LoggerFactory.getLogger(this.getClass) + private var _kernel: KernelLike = _ + + // TODO: Replace hard-coded maximum queue count + /** Represents the state used by this interpreter's R instance. */ + private lazy val sparkRState = new SparkRState(500) + + /** Represents the bridge used by this interpreter's R instance. */ + private lazy val sparkRBridge = SparkRBridge( + sparkRState, + _kernel + ) + + /** Represents the interface for R to talk to JVM Spark components. */ + private lazy val rBackend = new ReflectiveRBackend + + /** Represents the process handler used for the SparkR process. */ + private lazy val sparkRProcessHandler: SparkRProcessHandler = + new SparkRProcessHandler( + sparkRBridge, + restartOnFailure = true, + restartOnCompletion = true + ) + + private lazy val sparkRService = new SparkRService( + rBackend, + sparkRBridge, + sparkRProcessHandler + ) + private lazy val sparkRTransformer = new SparkRTransformer + + override def init(kernel: KernelLike): Interpreter = { + _kernel = kernel + this + } + + /** + * Executes the provided code with the option to silence output. + * @param code The code to execute + * @param silent Whether or not to execute the code silently (no output) + * @return The success/failure of the interpretation and the output from the + * execution or the failure + */ + override def interpret(code: String, silent: Boolean): + (Result, Either[ExecuteOutput, ExecuteFailure]) = + { + if (!sparkRService.isRunning) sparkRService.start() + + val futureResult = sparkRTransformer.transformToInterpreterResult( + sparkRService.submitCode(code) + ) + + Await.result(futureResult, Duration.Inf) + } + + /** + * Starts the interpreter, initializing any internal state. + * @return A reference to the interpreter + */ + override def start(): Interpreter = { + sparkRService.start() + + this + } + + /** + * Stops the interpreter, removing any previous internal state. + * @return A reference to the interpreter + */ + override def stop(): Interpreter = { + sparkRService.stop() + + this + } + + /** + * Returns the class loader used by this interpreter. + * + * @return The runtime class loader used by this interpreter + */ + override def classLoader: ClassLoader = this.getClass.getClassLoader + + // Unsupported (but can be invoked) + override def lastExecutionVariableName: Option[String] = None + + // Unsupported (but can be invoked) + override def read(variableName: String): Option[AnyRef] = None + + // Unsupported (but can be invoked) + override def completion(code: String, pos: Int): (Int, List[String]) = + (pos, Nil) + + // Unsupported + override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ??? + + // Unsupported + override def classServerURI: String = "" + + // Unsupported (but can be invoked) + override def bindSparkContext(sparkContext: SparkContext): Unit = {} + + // Unsupported (but can be invoked) + override def bindSqlContext(sqlContext: SQLContext): Unit = {} + + // Unsupported + override def interrupt(): Interpreter = ??? + + // Unsupported + override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ??? + + // Unsupported + override def addJars(jars: URL*): Unit = ??? + + // Unsupported + override def doQuietly[T](body: => T): T = ??? +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala new file mode 100644 index 0000000..2429dc4 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcess.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import com.ibm.spark.interpreter.broker.BrokerProcess +import scala.collection.JavaConverters._ + +/** + * Represents the R process used to evaluate SparkR code. + * + * @param sparkRBridge The bridge to use to retrieve kernel output streams + * and the Spark version to be verified + * @param sparkRProcessHandler The handler to use when the process fails or + * completes + * @param port The port to provide to the SparkR process to use to connect + * back to the JVM + */ +class SparkRProcess( + private val sparkRBridge: SparkRBridge, + private val sparkRProcessHandler: SparkRProcessHandler, + private val port: Int +) extends BrokerProcess( + processName = "Rscript", + entryResource = "kernelR/sparkr_runner.R", + otherResources = Seq("kernelR/sparkr_runner_utils.R", "sparkr_bundle.tar.gz"), + brokerBridge = sparkRBridge, + brokerProcessHandler = sparkRProcessHandler, + arguments = Seq( + "--default-packages=datasets,utils,grDevices,graphics,stats,methods" + ) +) { + override val brokerName: String = "SparkR" + private val sparkHome = Option(System.getenv("SPARK_HOME")) + .orElse(Option(System.getProperty("spark.home"))) + + assert(sparkHome.nonEmpty, "SparkR process requires Spark Home to be set!") + + /** + * Creates a new process environment to be used for environment variable + * retrieval by the new process. + * + * @return The map of environment variables and their respective values + */ + override protected def newProcessEnvironment(): Map[String, String] = { + val baseEnvironment = super.newProcessEnvironment() + + // Note: Adding the new map values should override the old ones + baseEnvironment ++ Map( + "SPARK_HOME" -> sparkHome.get, + "EXISTING_SPARKR_BACKEND_PORT" -> port.toString + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala new file mode 100644 index 0000000..d33d265 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRProcessHandler.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import com.ibm.spark.interpreter.broker.BrokerProcessHandler + +/** + * Represents the handler for events triggered by the SparkR process. + * + * @param sparkRBridge The bridge to reset when the process fails or completes + * @param restartOnFailure If true, restarts the process if it fails + * @param restartOnCompletion If true, restarts the process if it completes + */ +class SparkRProcessHandler( + private val sparkRBridge: SparkRBridge, + private val restartOnFailure: Boolean, + private val restartOnCompletion: Boolean +) extends BrokerProcessHandler( + sparkRBridge, + restartOnFailure, + restartOnCompletion +) { + override val brokerName: String = "SparkR" +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala new file mode 100644 index 0000000..71731a8 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRService.scala @@ -0,0 +1,121 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import java.util.concurrent.{TimeUnit, Semaphore} + +import com.ibm.spark.interpreter.broker.BrokerService +import com.ibm.spark.kernel.api.KernelLike +import com.ibm.spark.kernel.interpreter.sparkr.SparkRTypes.{Code, CodeResults} +import org.apache.spark.SparkContext +import org.slf4j.LoggerFactory + +import scala.concurrent.{future, Future} + +/** + * Represents the service that provides the high-level interface between the + * JVM and R. + * + * @param rBackend The backend to start to communicate between the JVM and R + * @param sparkRBridge The bridge to use for communication between the JVM and R + * @param sparkRProcessHandler The handler used for events that occur with the + * SparkR process + */ +class SparkRService( + private val rBackend: ReflectiveRBackend, + private val sparkRBridge: SparkRBridge, + private val sparkRProcessHandler: SparkRProcessHandler +) extends BrokerService { + private val logger = LoggerFactory.getLogger(this.getClass) + @volatile private var rBackendPort: Int = -1 + @volatile private var _isRunning: Boolean = false + override def isRunning: Boolean = _isRunning + + /** Represents the process used to execute R code via the bridge. */ + private lazy val sparkRProcess: SparkRProcess = { + val p = new SparkRProcess( + sparkRBridge, + sparkRProcessHandler, + rBackendPort + ) + + // Update handlers to correctly reset and restart the process + sparkRProcessHandler.setResetMethod(message => { + p.stop() + sparkRBridge.state.reset(message) + }) + sparkRProcessHandler.setRestartMethod(() => p.start()) + + p + } + + /** Starts the SparkR service. */ + override def start(): Unit = { + logger.debug("Initializing statically-accessible SparkR bridge") + SparkRBridge.sparkRBridge = sparkRBridge + + val initialized = new Semaphore(0) + import scala.concurrent.ExecutionContext.Implicits.global + val rBackendRun = future { + logger.debug("Initializing RBackend") + rBackendPort = rBackend.init() + logger.debug(s"RBackend running on port $rBackendPort") + initialized.release() + logger.debug("Running RBackend") + rBackend.run() + logger.debug("RBackend has finished") + } + + // Wait for backend to start before starting R process to connect + val backendTimeout = + sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt + if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) { + // Start the R process used to execute code + logger.debug("Launching process to execute R code") + sparkRProcess.start() + _isRunning = true + } else { + // Unable to initialize, so throw an exception + throw new SparkRException( + s"Unable to initialize R backend in $backendTimeout seconds!") + } + } + + /** + * Submits code to the SparkR service to be executed and return a result. + * + * @param code The code to execute + * + * @return The result as a future to eventually return + */ + override def submitCode(code: Code): Future[CodeResults] = { + sparkRBridge.state.pushCode(code) + } + + /** Stops the running SparkR service. */ + override def stop(): Unit = { + // Stop the R process used to execute code + sparkRProcess.stop() + + // Stop the server used as an entrypoint for R + rBackend.close() + + // Clear the bridge + SparkRBridge.reset() + + _isRunning = false + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala new file mode 100644 index 0000000..60f67a3 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRState.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import com.ibm.spark.interpreter.broker.BrokerState + +/** + * Represents the state structure of SparkR. + * + * @param maxQueuedCode The maximum amount of code to support being queued + * at the same time for SparkR execution + */ +class SparkRState(private val maxQueuedCode: Int) + extends BrokerState(maxQueuedCode) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala new file mode 100644 index 0000000..45c44c0 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTransformer.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import com.ibm.spark.interpreter.broker.BrokerTransformer + +/** + * Represents the transformer used by SparkR. + */ +class SparkRTransformer extends BrokerTransformer http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala new file mode 100644 index 0000000..11f33c0 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/SparkRTypes.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sparkr + +import com.ibm.spark.interpreter.broker.BrokerTypesProvider + +/** + * Represents all types associated with the SparkR interface. + */ +object SparkRTypes extends BrokerTypesProvider http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala new file mode 100644 index 0000000..872d376 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sparkr/package.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter + +import com.ibm.spark.interpreter.broker.{BrokerCode, BrokerPromise} + +/** + * Contains aliases to broker types. + */ +package object sparkr { + /** + * Represents a promise made regarding the completion of SparkR code + * execution. + */ + type SparkRPromise = BrokerPromise + + /** + * Represents a block of SparkR code to be evaluated. + */ + type SparkRCode = BrokerCode +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala ---------------------------------------------------------------------- diff --git a/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala b/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala new file mode 100644 index 0000000..7eba136 --- /dev/null +++ b/sparkr-interpreter/src/main/scala/org/apache/toree/magic/builtin/SparkR.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.magic.builtin + +import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted} +import com.ibm.spark.kernel.interpreter.sparkr.{SparkRInterpreter, SparkRException} +import com.ibm.spark.kernel.protocol.v5.MIMEType +import com.ibm.spark.magic.{CellMagicOutput, CellMagic} +import com.ibm.spark.magic.dependencies.IncludeKernel + +/** + * Represents the magic interface to use the SparkR interpreter. + */ +class SparkR extends CellMagic with IncludeKernel { + override def execute(code: String): CellMagicOutput = { + val sparkR = kernel.interpreter("SparkR") + + if (sparkR.isEmpty || sparkR.get == null) + throw new SparkRException("SparkR is not available!") + + sparkR.get match { + case sparkRInterpreter: SparkRInterpreter => + val (_, output) = sparkRInterpreter.interpret(code) + output match { + case Left(executeOutput) => + CellMagicOutput(MIMEType.PlainText -> executeOutput) + case Right(executeFailure) => executeFailure match { + case executeAborted: ExecuteAborted => + throw new SparkRException("SparkR code was aborted!") + case executeError: ExecuteError => + throw new SparkRException(executeError.value) + } + } + case otherInterpreter => + val className = otherInterpreter.getClass.getName + throw new SparkRException(s"Invalid SparkR interpreter: $className") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala deleted file mode 100644 index 2c0b4d5..0000000 --- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sql - -import com.ibm.spark.interpreter.broker.BrokerException - -/** - * Represents a generic SQL exception. - * - * @param message The message to associate with the exception - */ -class SqlException(message: String) extends BrokerException(message) - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala deleted file mode 100644 index 889d4a6..0000000 --- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlInterpreter.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sql - -import java.net.URL - -import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter} -import com.ibm.spark.interpreter.Results.Result -import com.ibm.spark.kernel.api.KernelLike -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext - -import scala.concurrent.duration._ -import scala.concurrent.Await -import scala.tools.nsc.interpreter.{OutputStream, InputStream} - -/** - * Represents an interpreter interface to Spark SQL. - */ -class SqlInterpreter() extends Interpreter { - private var _kernel: KernelLike = _ - private lazy val sqlService = new SqlService(_kernel) - private lazy val sqlTransformer = new SqlTransformer - - override def init(kernel: KernelLike): Interpreter = { - _kernel = kernel - this - } - - /** - * Executes the provided code with the option to silence output. - * @param code The code to execute - * @param silent Whether or not to execute the code silently (no output) - * @return The success/failure of the interpretation and the output from the - * execution or the failure - */ - override def interpret(code: String, silent: Boolean): - (Result, Either[ExecuteOutput, ExecuteFailure]) = - { - if (!sqlService.isRunning) sqlService.start() - - val futureResult = sqlTransformer.transformToInterpreterResult( - sqlService.submitCode(code) - ) - - Await.result(futureResult, Duration.Inf) - } - - /** - * Starts the interpreter, initializing any internal state. - * @return A reference to the interpreter - */ - override def start(): Interpreter = { - sqlService.start() - - this - } - - /** - * Stops the interpreter, removing any previous internal state. - * @return A reference to the interpreter - */ - override def stop(): Interpreter = { - sqlService.stop() - - this - } - - /** - * Returns the class loader used by this interpreter. - * - * @return The runtime class loader used by this interpreter - */ - override def classLoader: ClassLoader = this.getClass.getClassLoader - - // Unsupported (but can be invoked) - override def lastExecutionVariableName: Option[String] = None - - // Unsupported (but can be invoked) - override def read(variableName: String): Option[AnyRef] = None - - // Unsupported (but can be invoked) - override def completion(code: String, pos: Int): (Int, List[String]) = - (pos, Nil) - - // Unsupported - override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ??? - - // Unsupported - override def classServerURI: String = "" - - // Unsupported (but can be invoked) - override def bindSparkContext(sparkContext: SparkContext): Unit = {} - - // Unsupported (but can be invoked) - override def bindSqlContext(sqlContext: SQLContext): Unit = {} - - // Unsupported - override def interrupt(): Interpreter = ??? - - // Unsupported - override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ??? - - // Unsupported - override def addJars(jars: URL*): Unit = ??? - - // Unsupported - override def doQuietly[T](body: => T): T = ??? -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala deleted file mode 100644 index 2f2fed6..0000000 --- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlService.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sql - -import com.ibm.spark.kernel.api.KernelLike -import java.io.ByteArrayOutputStream - -import com.ibm.spark.interpreter.broker.BrokerService -import com.ibm.spark.kernel.interpreter.sql.SqlTypes._ -import org.apache.spark.sql.SQLContext - -import scala.concurrent.{Future, future} - -/** - * Represents the service that provides the high-level interface between the - * JVM and Spark SQL. - * - * @param kernel The SQL Context of Apache Spark to use to perform SQL - * queries - */ -class SqlService(private val kernel: KernelLike) extends BrokerService { - import scala.concurrent.ExecutionContext.Implicits.global - - @volatile private var _isRunning: Boolean = false - override def isRunning: Boolean = _isRunning - - /** - * Submits code to the broker service to be executed and return a result. - * - * @param code The code to execute - * - * @return The result as a future to eventually return - */ - override def submitCode(code: Code): Future[CodeResults] = future { - println(s"Executing: '${code.trim}'") - val result = kernel.sqlContext.sql(code.trim) - - // TODO: There is an internal method used for show called showString that - // supposedly is only for the Python API, look into why - val stringOutput = { - val outputStream = new ByteArrayOutputStream() - Console.withOut(outputStream) { - // TODO: Provide some way to change the number of records shown - result.show(10) - } - outputStream.toString("UTF-8") - } - - stringOutput - } - - /** Stops the running broker service. */ - override def stop(): Unit = _isRunning = false - - /** Starts the broker service. */ - override def start(): Unit = _isRunning = true -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala deleted file mode 100644 index 114c97f..0000000 --- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTransformer.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sql - -import com.ibm.spark.interpreter.broker.BrokerTransformer - -/** - * Represents the transformer used by Apache SQL. - */ -class SqlTransformer extends BrokerTransformer http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala b/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala deleted file mode 100644 index a2fbd10..0000000 --- a/sql-interpreter/src/main/scala/com/ibm/spark/kernel/interpreter/sql/SqlTypes.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.interpreter.sql - -import com.ibm.spark.interpreter.broker.BrokerTypesProvider - -/** - * Represents all types associated with the SQL interface. - */ -object SqlTypes extends BrokerTypesProvider http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala b/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala deleted file mode 100644 index a8f439c..0000000 --- a/sql-interpreter/src/main/scala/com/ibm/spark/magic/builtin/Sql.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.magic.builtin - -import com.ibm.spark.interpreter.{ExecuteError, ExecuteAborted} -import com.ibm.spark.kernel.interpreter.sql.{SqlInterpreter, SqlException} -import com.ibm.spark.kernel.protocol.v5.MIMEType -import com.ibm.spark.magic.{CellMagicOutput, CellMagic} -import com.ibm.spark.magic.dependencies.IncludeKernel - -/** - * Represents the magic interface to use the SQL interpreter. - */ -class Sql extends CellMagic with IncludeKernel { - override def execute(code: String): CellMagicOutput = { - val sparkR = kernel.interpreter("SQL") - - if (sparkR.isEmpty || sparkR.get == null) - throw new SqlException("SQL is not available!") - - sparkR.get match { - case sparkRInterpreter: SqlInterpreter => - val (_, output) = sparkRInterpreter.interpret(code) - output match { - case Left(executeOutput) => - CellMagicOutput(MIMEType.PlainText -> executeOutput) - case Right(executeFailure) => executeFailure match { - case executeAborted: ExecuteAborted => - throw new SqlException("SQL code was aborted!") - case executeError: ExecuteError => - throw new SqlException(executeError.value) - } - } - case otherInterpreter => - val className = otherInterpreter.getClass.getName - throw new SqlException(s"Invalid SQL interpreter: $className") - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala new file mode 100644 index 0000000..2c0b4d5 --- /dev/null +++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlException.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sql + +import com.ibm.spark.interpreter.broker.BrokerException + +/** + * Represents a generic SQL exception. + * + * @param message The message to associate with the exception + */ +class SqlException(message: String) extends BrokerException(message) + http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala new file mode 100644 index 0000000..889d4a6 --- /dev/null +++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlInterpreter.scala @@ -0,0 +1,122 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sql + +import java.net.URL + +import com.ibm.spark.interpreter.{ExecuteFailure, ExecuteOutput, Interpreter} +import com.ibm.spark.interpreter.Results.Result +import com.ibm.spark.kernel.api.KernelLike +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext + +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.tools.nsc.interpreter.{OutputStream, InputStream} + +/** + * Represents an interpreter interface to Spark SQL. + */ +class SqlInterpreter() extends Interpreter { + private var _kernel: KernelLike = _ + private lazy val sqlService = new SqlService(_kernel) + private lazy val sqlTransformer = new SqlTransformer + + override def init(kernel: KernelLike): Interpreter = { + _kernel = kernel + this + } + + /** + * Executes the provided code with the option to silence output. + * @param code The code to execute + * @param silent Whether or not to execute the code silently (no output) + * @return The success/failure of the interpretation and the output from the + * execution or the failure + */ + override def interpret(code: String, silent: Boolean): + (Result, Either[ExecuteOutput, ExecuteFailure]) = + { + if (!sqlService.isRunning) sqlService.start() + + val futureResult = sqlTransformer.transformToInterpreterResult( + sqlService.submitCode(code) + ) + + Await.result(futureResult, Duration.Inf) + } + + /** + * Starts the interpreter, initializing any internal state. + * @return A reference to the interpreter + */ + override def start(): Interpreter = { + sqlService.start() + + this + } + + /** + * Stops the interpreter, removing any previous internal state. + * @return A reference to the interpreter + */ + override def stop(): Interpreter = { + sqlService.stop() + + this + } + + /** + * Returns the class loader used by this interpreter. + * + * @return The runtime class loader used by this interpreter + */ + override def classLoader: ClassLoader = this.getClass.getClassLoader + + // Unsupported (but can be invoked) + override def lastExecutionVariableName: Option[String] = None + + // Unsupported (but can be invoked) + override def read(variableName: String): Option[AnyRef] = None + + // Unsupported (but can be invoked) + override def completion(code: String, pos: Int): (Int, List[String]) = + (pos, Nil) + + // Unsupported + override def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream): Unit = ??? + + // Unsupported + override def classServerURI: String = "" + + // Unsupported (but can be invoked) + override def bindSparkContext(sparkContext: SparkContext): Unit = {} + + // Unsupported (but can be invoked) + override def bindSqlContext(sqlContext: SQLContext): Unit = {} + + // Unsupported + override def interrupt(): Interpreter = ??? + + // Unsupported + override def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]): Unit = ??? + + // Unsupported + override def addJars(jars: URL*): Unit = ??? + + // Unsupported + override def doQuietly[T](body: => T): T = ??? +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala new file mode 100644 index 0000000..2f2fed6 --- /dev/null +++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlService.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sql + +import com.ibm.spark.kernel.api.KernelLike +import java.io.ByteArrayOutputStream + +import com.ibm.spark.interpreter.broker.BrokerService +import com.ibm.spark.kernel.interpreter.sql.SqlTypes._ +import org.apache.spark.sql.SQLContext + +import scala.concurrent.{Future, future} + +/** + * Represents the service that provides the high-level interface between the + * JVM and Spark SQL. + * + * @param kernel The SQL Context of Apache Spark to use to perform SQL + * queries + */ +class SqlService(private val kernel: KernelLike) extends BrokerService { + import scala.concurrent.ExecutionContext.Implicits.global + + @volatile private var _isRunning: Boolean = false + override def isRunning: Boolean = _isRunning + + /** + * Submits code to the broker service to be executed and return a result. + * + * @param code The code to execute + * + * @return The result as a future to eventually return + */ + override def submitCode(code: Code): Future[CodeResults] = future { + println(s"Executing: '${code.trim}'") + val result = kernel.sqlContext.sql(code.trim) + + // TODO: There is an internal method used for show called showString that + // supposedly is only for the Python API, look into why + val stringOutput = { + val outputStream = new ByteArrayOutputStream() + Console.withOut(outputStream) { + // TODO: Provide some way to change the number of records shown + result.show(10) + } + outputStream.toString("UTF-8") + } + + stringOutput + } + + /** Stops the running broker service. */ + override def stop(): Unit = _isRunning = false + + /** Starts the broker service. */ + override def start(): Unit = _isRunning = true +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala new file mode 100644 index 0000000..114c97f --- /dev/null +++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTransformer.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sql + +import com.ibm.spark.interpreter.broker.BrokerTransformer + +/** + * Represents the transformer used by Apache SQL. + */ +class SqlTransformer extends BrokerTransformer http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala ---------------------------------------------------------------------- diff --git a/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala new file mode 100644 index 0000000..a2fbd10 --- /dev/null +++ b/sql-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/sql/SqlTypes.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.interpreter.sql + +import com.ibm.spark.interpreter.broker.BrokerTypesProvider + +/** + * Represents all types associated with the SQL interface. + */ +object SqlTypes extends BrokerTypesProvider