http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/Results.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/Results.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/Results.scala new file mode 100644 index 0000000..8bd12d0 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/Results.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter + +/** + * Represents interpreter results, mostly taken from the + * tools.nsc.interpreter.Results object. + */ +object Results { + abstract sealed class Result + + /** The line was interpreted successfully. */ + case object Success extends Result { override def toString = "success" } + + /** The line was erroneous in some way. */ + case object Error extends Result { override def toString = "error" } + + /** The input was incomplete. The caller should request more input. */ + case object Incomplete extends Result { override def toString = "incomplete" } + + /** The line was aborted before completed. */ + case object Aborted extends Result { override def toString = "aborted" } +} +
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala new file mode 100644 index 0000000..94b9a24 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerBridge.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import com.ibm.spark.interpreter.broker.producer.{SQLContextProducerLike, JavaSparkContextProducerLike} +import com.ibm.spark.kernel.api.KernelLike +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + +/** + * Represents the API available to the broker to act as the bridge for data + * between the JVM and some external process. + * + * @param _brokerState The container of broker state to expose + * @param _kernel The kernel API to expose through the bridge + */ +class BrokerBridge( + private val _brokerState: BrokerState, + private val _kernel: KernelLike +) extends BrokerName { + /** + * Represents the current state of the broker. + */ + val state: BrokerState = _brokerState + + /** + * Represents the kernel API available. + */ + val kernel: KernelLike = _kernel +} + http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerCode.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerCode.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerCode.scala new file mode 100644 index 0000000..e480aa8 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerCode.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import BrokerTypes._ + +/** + * Represents a block of code to be evaluated. + * + * @param codeId The id to associate with the code to be executed + * @param code The code to evaluate using the broker + */ +case class BrokerCode(codeId: CodeId, code: Code) + http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerException.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerException.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerException.scala new file mode 100644 index 0000000..b059552 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerException.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +/** + * Represents a generic broker exception. + * + * @param message The message to associate with the exception + */ +class BrokerException(message: String) extends Throwable(message) + http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerName.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerName.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerName.scala new file mode 100644 index 0000000..1482ade --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerName.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +/** + * Represents the interface that associates a name with a broker. Can be + * overridden to change name of broker in subclassing. + */ +trait BrokerName { + /** The name of the broker. */ + val brokerName: String = "broker" +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcess.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcess.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcess.scala new file mode 100644 index 0000000..5072b92 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcess.scala @@ -0,0 +1,220 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import java.io.{OutputStream, InputStream, File, FileOutputStream} + +import org.apache.commons.exec._ +import org.apache.commons.exec.environment.EnvironmentUtils +import org.apache.commons.io.{FilenameUtils, IOUtils} +import org.slf4j.LoggerFactory +import scala.collection.JavaConverters._ + +/** + * Represents the process used to evaluate broker code. + * + * @param processName The name of the process to invoke + * @param entryResource The resource to be copied and fed as the first argument + * to the process + * @param otherResources Other resources to be included in the same directory + * as the main resource + * @param brokerBridge The bridge to use to retrieve kernel output streams + * and the Spark version to be verified + * @param brokerProcessHandler The handler to use when the process fails or + * completes + * @param arguments The collection of additional arguments to pass to the + * process after the main entrypoint + */ +class BrokerProcess( + private val processName: String, + private val entryResource: String, + private val otherResources: Seq[String], + private val brokerBridge: BrokerBridge, + private val brokerProcessHandler: BrokerProcessHandler, + private val arguments: Seq[String] = Nil +) extends BrokerName { + require(processName != null && processName.trim.nonEmpty, + "Process name cannot be null or pure whitespace!") + require(entryResource != null && entryResource.trim.nonEmpty, + "Entry resource cannot be null or pure whitespace!") + + private val logger = LoggerFactory.getLogger(this.getClass) + private val classLoader = this.getClass.getClassLoader + private val outputDir = + s"kernel-$brokerName-" + java.util.UUID.randomUUID().toString + + /** Represents the current process being executed. */ + @volatile private[broker] var currentExecutor: Option[Executor] = None + + /** + * Returns the temporary directory to place any files needed for the process. + * + * @return The directory path as a string + */ + protected def getTmpDirectory: String = System.getProperty("java.io.tmpdir") + + /** + * Returns the subdirectory to use to place any files needed for the process. + * + * @return The directory path as a string + */ + protected lazy val getSubDirectory: String = + s"kernel-$brokerName-" + java.util.UUID.randomUUID().toString + + /** + * Copies a resource from an input stream to an output stream. + * + * @param inputStream The input stream to copy from + * @param outputStream The output stream to copy to + * + * @return The result of the copy operation + */ + protected def copy(inputStream: InputStream, outputStream: OutputStream) = + IOUtils.copy(inputStream, outputStream) + + /** + * Copies a file from the kernel resources to the temporary directory. + * + * @param resource The resource to copy + * + * @return The string path pointing to the resource's destination + */ + protected def copyResourceToTmp(resource: String): String = { + val brokerRunnerResourceStream = classLoader.getResourceAsStream(resource) + + val tmpDirectory = Option(getTmpDirectory) + .getOrElse(throw new BrokerException("java.io.tmpdir is not set!")) + val subDirectory = Option(getSubDirectory).getOrElse("") + val outputName = FilenameUtils.getName(resource) + + val outputDir = Seq(tmpDirectory, subDirectory) + .filter(_.trim.nonEmpty).mkString("/") + val outputScript = new File(FilenameUtils.concat(outputDir, outputName)) + + // If our script destination is a directory, we cannot copy the script + if (outputScript.exists() && outputScript.isDirectory) + throw new BrokerException(s"Failed to create script: $outputScript") + + // Ensure that all of the directories leading up to the script exist + val outputDirFile = new File(outputDir) + if (!outputDirFile.exists()) outputDirFile.mkdirs() + + // Copy the script to the specified temporary destination + val outputScriptStream = new FileOutputStream(outputScript) + copy( + brokerRunnerResourceStream, + outputScriptStream + ) + outputScriptStream.close() + + // Return the destination of the script + val destination = outputScript.getPath + logger.debug(s"Successfully copied $resource to $destination") + destination + } + + /** + * Creates a new process environment to be used for environment variable + * retrieval by the new process. + * + * @return The map of environment variables and their respective values + */ + protected def newProcessEnvironment(): Map[String, String] = { + val procEnvironment = EnvironmentUtils.getProcEnvironment + + procEnvironment.asScala.toMap + } + + /** + * Creates a new executor to be used to launch the process. + * + * @return The executor to start and manage the process + */ + protected def newExecutor(): Executor = new DefaultExecutor + + /** + * Starts the Broker process. + */ + def start(): Unit = currentExecutor.synchronized { + assert(currentExecutor.isEmpty, "Process has already been started!") + + val capitalizedBrokerName = brokerName.capitalize + + val script = copyResourceToTmp(entryResource) + logger.debug(s"New $brokerName script created: $script") + + val createdResources = otherResources.map(copyResourceToTmp) + + // Verify that all files were successfully created + val createdResult = (script +: createdResources).map(new File(_)).map(f => { + if (f.exists()) true + else { + val resource = f.getPath + logger.warn(s"Failed to create resource: $resource") + false + } + }).forall(_ == true) + if (!createdResult) throw new BrokerException( + s"Failed to create resources for $capitalizedBrokerName" + ) + + val commandLine = CommandLine + .parse(processName) + .addArgument(script) + arguments.foreach(commandLine.addArgument) + + logger.debug(s"$capitalizedBrokerName command: ${commandLine.toString}") + + val executor = newExecutor() + + // TODO: Figure out how to dynamically update the output stream used + // to use kernel.out, kernel.err, and kernel.in + // NOTE: Currently mapping to standard output/input, which will be caught + // by our system and redirected through the kernel to the client + executor.setStreamHandler(new PumpStreamHandler( + System.out, + System.err, + System.in + )) + + // Marking exit status of 1 as successful exit + executor.setExitValue(1) + + // Prevent the runner from being killed due to run time as it is a + // long-term process + executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)) + + val processEnvironment = newProcessEnvironment().asJava + logger.debug(s"$capitalizedBrokerName environment: $processEnvironment") + + // Start the process using the environment provided to the parent + executor.execute(commandLine, processEnvironment, brokerProcessHandler) + + currentExecutor = Some(executor) + } + + /** + * Stops the Broker process. + */ + def stop(): Unit = currentExecutor.synchronized { + currentExecutor.foreach(executor => { + logger.debug(s"Stopping $brokerName process") + executor.getWatchdog.destroyProcess() + }) + currentExecutor = None + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcessHandler.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcessHandler.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcessHandler.scala new file mode 100644 index 0000000..704f974 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcessHandler.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import org.apache.commons.exec.{ExecuteException, ExecuteResultHandler} +import org.slf4j.LoggerFactory + +/** + * Represents the handler for events triggered by the broker process. + * + * @param brokerBridge The bridge to reset when the process fails or completes + * @param restartOnFailure If true, restarts the process if it fails + * @param restartOnCompletion If true, restarts the process if it completes + */ +class BrokerProcessHandler( + private val brokerBridge: BrokerBridge, + private val restartOnFailure: Boolean, + private val restartOnCompletion: Boolean +) extends ExecuteResultHandler with BrokerName { + private val logger = LoggerFactory.getLogger(this.getClass) + private val capitalizedBrokerName = brokerName.capitalize + private val resetMessage = s"$capitalizedBrokerName was reset!" + + private var performReset: String => Unit = (_) => {} + private var performRestart: () => Unit = () => {} + + /** + * Sets the reset method used when a reset of the process is asked. + * + * @param resetMethod The method to use for resetting the process + */ + def setResetMethod(resetMethod: String => Unit): Unit = + performReset = resetMethod + + /** + * Sets the restart method used when a restart of the process is asked. + * + * @param restartMethod The method to use for restarting the process + */ + def setRestartMethod(restartMethod: () => Unit): Unit = + performRestart = restartMethod + + override def onProcessFailed(ex: ExecuteException): Unit = { + logger.error(s"$capitalizedBrokerName process failed: $ex") + performReset(resetMessage) + + if (restartOnFailure) performRestart() + } + + override def onProcessComplete(exitValue: Int): Unit = { + logger.error(s"$capitalizedBrokerName process exited: $exitValue") + performReset(resetMessage) + + if (restartOnCompletion) performRestart() + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerPromise.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerPromise.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerPromise.scala new file mode 100644 index 0000000..3fe96bf --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerPromise.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import com.ibm.spark.interpreter.broker.BrokerTypes.{CodeResults, CodeId} + +import scala.concurrent.Promise + +/** + * Represents a promise made regarding the completion of broker code execution. + * + * @param codeId The id of the code that was executed + * @param promise The promise to be fulfilled when the code finishes executing + */ +case class BrokerPromise(codeId: CodeId, promise: Promise[CodeResults]) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerService.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerService.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerService.scala new file mode 100644 index 0000000..27430af --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerService.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import com.ibm.spark.interpreter.broker.BrokerTypes.{Code, CodeResults} +import scala.concurrent.Future + +/** + * Represents the service that provides the high-level interface between the + * JVM and another process. + */ +trait BrokerService { + /** Starts the broker service. */ + def start(): Unit + + /** + * Indicates whether or not the service is running. + * + * @return True if running, otherwise false + */ + def isRunning: Boolean + + /** + * Submits code to the broker service to be executed and return a result. + * + * @param code The code to execute + * + * @return The result as a future to eventually return + */ + def submitCode(code: Code): Future[CodeResults] + + /** Stops the running broker service. */ + def stop(): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala new file mode 100644 index 0000000..409d789 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerState.scala @@ -0,0 +1,176 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import java.util.concurrent.ConcurrentHashMap + +import com.ibm.spark.interpreter.broker.BrokerTypes._ +import org.slf4j.LoggerFactory + +import scala.concurrent.{Future, promise} + +/** + * Represents the state structure of broker. + * + * @param maxQueuedCode The maximum amount of code to support being queued + * at the same time for broker execution + * + */ +class BrokerState(private val maxQueuedCode: Int) { + private val logger = LoggerFactory.getLogger(this.getClass) + + import scala.collection.JavaConverters._ + + private var _isReady: Boolean = false + protected val codeQueue: java.util.Queue[BrokerCode] = + new java.util.concurrent.ConcurrentLinkedQueue[BrokerCode]() + protected val promiseMap: collection.mutable.Map[CodeId, BrokerPromise] = + new ConcurrentHashMap[CodeId, BrokerPromise]().asScala + + /** + * Adds new code to eventually be executed. + * + * @param code The snippet of code to execute + * + * @return The future containing the results of the execution + */ + def pushCode(code: Code): Future[CodeResults] = synchronized { + // Throw the standard error if our maximum limit has been reached + if (codeQueue.size() >= maxQueuedCode) + throw new IllegalStateException( + s"Code limit of $maxQueuedCode has been reached!") + + // Generate our promise that will be fulfilled when the code is executed + // and the results are sent back + val codeExecutionPromise = promise[CodeResults]() + + // Build the code representation to send to Broker + val uniqueId = java.util.UUID.randomUUID().toString + val brokerCode = BrokerCode(uniqueId, code) + val brokerPromise = BrokerPromise(uniqueId, codeExecutionPromise) + + logger.debug(s"Queueing '$code' with id '$uniqueId' to run with broker") + + // Add the code to be executed to our queue and the promise to our map + codeQueue.add(brokerCode) + promiseMap.put(brokerPromise.codeId, brokerPromise) + + codeExecutionPromise.future + } + + /** + * Returns the total code currently queued to be executed. + * + * @return The total number of code instances queued to be executed + */ + def totalQueuedCode(): Int = codeQueue.size() + + /** + * Retrieves (and removes) the next piece of code to be executed. + * + * @note This should only be invoked by the broker process! + * + * @return The next code to execute if available, otherwise null + */ + def nextCode(): BrokerCode = { + val brokerCode = codeQueue.poll() + + if (brokerCode != null) + logger.trace(s"Sending $brokerCode to Broker runner") + + brokerCode + } + + /** + * Indicates whether or not the broker instance is ready for code. + * + * @return True if it is ready, otherwise false + */ + def isReady: Boolean = _isReady + + /** + * Marks the state of broker as ready. + */ + def markReady(): Unit = _isReady = true + + /** + * Marks the specified code as successfully completed using its id. + * + * @param codeId The id of the code to mark as a success + * @param output The output from the execution to be used as the result + */ + def markSuccess(codeId: CodeId, output: CodeResults): Unit = { + logger.debug(s"Received success for code with id '$codeId': $output") + promiseMap.remove(codeId).foreach(_.promise.success(output)) + } + + /** + * Marks the specified code as successfully completed using its id. Output + * from success is treated as an empty string. + * + * @param codeId The id of the code to mark as a success + */ + def markSuccess(codeId: CodeId): Unit = markSuccess(codeId, "") + + /** + * Marks the specified code as unsuccessful using its id. + * + * @param codeId The id of the code to mark as a failure + * @param output The output from the error to be used as the description + * of the exception + */ + def markFailure(codeId: CodeId, output: CodeResults): Unit = { + logger.debug(s"Received failure for code with id '$codeId': $output") + promiseMap.remove(codeId).foreach( + _.promise.failure(new BrokerException(output))) + } + + /** + * Marks the specified code as unsuccessful using its id. Output from failure + * is treated as an empty string. + * + * @param codeId The id of the code to mark as a failure + */ + def markFailure(codeId: CodeId): Unit = markFailure(codeId, "") + + /** + * Resets the state by clearing any pending code executions and marking all + * pending executions as failures (or success if specified). + * + * @param message The message to present through the interrupted promises + * @param markAllAsFailure If true, marks all pending executions as failures, + * otherwise marks all as success + */ + def reset(message: String, markAllAsFailure: Boolean = true): Unit = { + codeQueue.synchronized { + promiseMap.synchronized { + codeQueue.clear() + + // Use map contents for reset as it should contain non-executing + // code as well as executing code + promiseMap.foreach { case (codeId, codePromise) => + if (markAllAsFailure) + codePromise.promise.failure(new BrokerException(message)) + else + codePromise.promise.success(message) + } + promiseMap.clear() + } + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala new file mode 100644 index 0000000..aa18648 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +import com.ibm.spark.interpreter.InterpreterTypes.ExecuteOutput +import com.ibm.spark.interpreter.Results.Result +import com.ibm.spark.interpreter.broker.BrokerTypes.CodeResults +import com.ibm.spark.interpreter.{ExecuteError, ExecuteFailure, Results} + +import scala.concurrent.Future + +/** + * Represents a utility that can transform raw broker information to + * kernel information. + */ +class BrokerTransformer { + /** + * Transforms a pure result containing output information into a form that + * the interpreter interface expects. + * + * @param futureResult The raw result as a future + * + * @return The transformed result as a future + */ + def transformToInterpreterResult(futureResult: Future[CodeResults]): + Future[(Result, Either[ExecuteOutput, ExecuteFailure])] = + { + import scala.concurrent.ExecutionContext.Implicits.global + + futureResult + .map(results => (Results.Success, Left(results))) + .recover({ case ex: BrokerException => + (Results.Error, Right(ExecuteError( + name = ex.getClass.getName, + value = ex.getLocalizedMessage, + stackTrace = ex.getStackTrace.map(_.toString).toList + ))) + }) + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypes.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypes.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypes.scala new file mode 100644 index 0000000..71e4d3d --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypes.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +/** + * Represents all types associated with the broker interface. + */ +object BrokerTypes extends BrokerTypesProvider http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypesProvider.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypesProvider.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypesProvider.scala new file mode 100644 index 0000000..2af47e4 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTypesProvider.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker + +/** + * Provides broker types to the class/trait that implements this trait. + */ +trait BrokerTypesProvider { + /** Represents the id used to keep track of executing code. */ + type CodeId = String + + /** Represents the code to execute. */ + type Code = String + + /** Represents the results of code execution or the failure message. */ + type CodeResults = String +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala new file mode 100644 index 0000000..cda61f3 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/JavaSparkContextProducerLike.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker.producer + +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaSparkContext + +/** + * Represents a producer for a JavaSparkContext. + */ +trait JavaSparkContextProducerLike { + /** + * Creates a new JavaSparkContext instance. + * + * @param sparkContext The SparkContext instance to use to create the Java one + * + * @return The new JavaSparkContext + */ + def newJavaSparkContext(sparkContext: SparkContext): JavaSparkContext +} + +/** + * Represents the standard producer for a JavaSparkContext. + */ +trait StandardJavaSparkContextProducer extends JavaSparkContextProducerLike { + def newJavaSparkContext(sparkContext: SparkContext): JavaSparkContext = + new JavaSparkContext(sparkContext) +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala new file mode 100644 index 0000000..fd46268 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/broker/producer/SQLContextProducerLike.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.broker.producer + +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext + +/** + * Represents a producer for a SQLContext. + */ +trait SQLContextProducerLike { + /** + * Creates a new SQLContext instance. + * + * @param sparkContext The SparkContext instance to use to create the SQL one + * + * @return The new SQLContext + */ + def newSQLContext(sparkContext: SparkContext): SQLContext +} + +/** + * Represents the standard producer for a SQLContext. + */ +trait StandardSQLContextProducer extends SQLContextProducerLike { + def newSQLContext(sparkContext: SparkContext): SQLContext = + new SQLContext(sparkContext) +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperConsole.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperConsole.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperConsole.scala new file mode 100644 index 0000000..42c5616 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperConsole.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.imports.printers + +import java.io._ + +import com.ibm.spark.utils.DynamicReflectionSupport + +/** + * Represents a wrapper for the scala.Console for Scala 2.10.4 implementation. + * @param in The input stream used for standard in + * @param out The output stream used for standard out + * @param err The output stream used for standard error + */ +class WrapperConsole( + val in: BufferedReader, + val out: PrintStream, + val err: PrintStream +) extends DynamicReflectionSupport(Class.forName("scala.Console$"), scala.Console) { + require(in != null) + require(out != null) + require(err != null) + + // + // SUPPORTED PRINT OPERATIONS + // + + def print(obj: Any): Unit = out.print(obj) + def printf(text: String, args: Any*): Unit = + out.print(text.format(args: _*)) + def println(x: Any): Unit = out.println(x) + def println(): Unit = out.println() +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperSystem.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperSystem.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperSystem.scala new file mode 100644 index 0000000..4583680 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/imports/printers/WrapperSystem.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.interpreter.imports.printers + +import java.io._ + +import com.ibm.spark.utils.DynamicReflectionSupport + +/** + * Represents a wrapper for java.lang.System. + * @param inStream The input stream used for standard in + * @param outStream The output stream used for standard out + * @param errStream The output stream used for standard error + */ +class WrapperSystem( + private val inStream: InputStream, + private val outStream: OutputStream, + private val errStream: OutputStream +) extends DynamicReflectionSupport(Class.forName("java.lang.System"), null){ + require(inStream != null) + require(outStream != null) + require(errStream != null) + + private val outPrinter = new PrintStream(outStream) + private val errPrinter = new PrintStream(errStream) + + // + // MASKED METHODS + // + + def in = inStream + def out = outPrinter + def err = errPrinter +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/interpreter/package.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/interpreter/package.scala b/kernel-api/src/main/scala/org/apache/toree/interpreter/package.scala new file mode 100644 index 0000000..451316d --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/interpreter/package.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark + +// TODO: Deprecate and remove this package object as it is difficult to +// remember where this type comes from +package object interpreter { + /** + * Represents the output from an interpret execution. + */ + type ExecuteOutput = String + +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/kernel/api/FactoryMethodsLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/kernel/api/FactoryMethodsLike.scala b/kernel-api/src/main/scala/org/apache/toree/kernel/api/FactoryMethodsLike.scala new file mode 100644 index 0000000..1642e1b --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/kernel/api/FactoryMethodsLike.scala @@ -0,0 +1,34 @@ +package com.ibm.spark.kernel.api + +import java.io.{InputStream, OutputStream} + +/** + * Represents the methods available to create objects related to the kernel. + */ +trait FactoryMethodsLike { + /** + * Creates a new kernel output stream. + * + * @param streamType The type of output stream (stdout/stderr) + * @param sendEmptyOutput If true, will send message even if output is empty + * + * @return The new KernelOutputStream instance + */ + def newKernelOutputStream( + streamType: String, + sendEmptyOutput: Boolean + ): OutputStream + + /** + * Creates a new kernel input stream. + * + * @param prompt The text to use as a prompt + * @param password If true, should treat input as a password field + * + * @return The new KernelInputStream instance + */ + def newKernelInputStream( + prompt: String, + password: Boolean + ): InputStream +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala new file mode 100644 index 0000000..c9442aa --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelLike.scala @@ -0,0 +1,106 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.kernel.api + +import java.io.{PrintStream, InputStream, OutputStream} + +import com.typesafe.config.Config +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.SQLContext + +/** + * Interface for the kernel API. This does not include exposed variables. + */ +trait KernelLike { + + def createSparkContext(conf: SparkConf): SparkContext + + def createSparkContext(master: String, appName: String): SparkContext + + /** + * Executes a block of code represented as a string and returns the result. + * + * @param code The code as an option to execute + * + * @return A tuple containing the result (true/false) and the output as a + * string + */ + def eval(code: Option[String]): (Boolean, String) + + /** + * Returns a collection of methods that can be used to generate objects + * related to the kernel. + * + * @return The collection of factory methods + */ + def factory: FactoryMethodsLike + + /** + * Returns a collection of methods that can be used to stream data from the + * kernel to the client. + * + * @return The collection of stream methods + */ + def stream: StreamMethodsLike + + /** + * Returns a print stream to be used for communication back to clients + * via standard out. + * + * @return The print stream instance or an error if the stream info is + * not found + */ + def out: PrintStream + + /** + * Returns a print stream to be used for communication back to clients + * via standard error. + * + * @return The print stream instance or an error if the stream info is + * not found + */ + def err: PrintStream + + /** + * Returns an input stream to be used to receive information from the client. + * + * @return The input stream instance or an error if the stream info is + * not found + */ + def in: InputStream + + /** + * Represents data to be shared using the kernel as the middleman. + * + * @note Using Java structure to enable other languages to have easy access! + */ + val data: java.util.Map[String, Any] + + + def interpreter(name: String): Option[com.ibm.spark.interpreter.Interpreter] + + def config: Config + + def sparkContext: SparkContext + + def sparkConf: SparkConf + + def javaSparkContext: JavaSparkContext + + def sqlContext: SQLContext +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelOptions.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelOptions.scala b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelOptions.scala new file mode 100644 index 0000000..00d00c9 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/kernel/api/KernelOptions.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.kernel.api + + +object KernelOptions { + var showTypes: Boolean = false + var noTruncation: Boolean = false +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamInfo.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamInfo.scala b/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamInfo.scala new file mode 100644 index 0000000..24cef4c --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamInfo.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.kernel.api + +/** + * Represents a "wrapper" for information needed to stream stdout/stderr from + * the kernel to a client. + * + * @note This exists because the KernelMessage instance is defined in the + * protocol project, which is not brought into this project. Furthermore, + * it is better practice to provide an explicit wrapper type rather than + * a more common type for implicit use. + */ +trait StreamInfo http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamMethodsLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamMethodsLike.scala b/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamMethodsLike.scala new file mode 100644 index 0000000..4e7d9d8 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/kernel/api/StreamMethodsLike.scala @@ -0,0 +1,13 @@ +package com.ibm.spark.kernel.api + +/** + * Represents the methods available to stream data from the kernel to the + * client. + */ +trait StreamMethodsLike { + /** + * Sends all text provided as one stream message to the client. + * @param text The text to wrap in a stream message + */ + def sendAll(text: String): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/CellMagic.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/CellMagic.scala b/kernel-api/src/main/scala/org/apache/toree/magic/CellMagic.scala new file mode 100644 index 0000000..3da1f04 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/CellMagic.scala @@ -0,0 +1,8 @@ +package com.ibm.spark.magic + +/** + * Cell Magics change the output of a cell in IPython + */ +trait CellMagic extends Magic { + override def execute(code: String): CellMagicOutput +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/InternalClassLoader.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/InternalClassLoader.scala b/kernel-api/src/main/scala/org/apache/toree/magic/InternalClassLoader.scala new file mode 100644 index 0000000..349efa6 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/InternalClassLoader.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic + +/** + * Represents a classloader that can load classes from within. + * + * @param classLoader The classloader to use for internal retrieval + * (defaults to self's classloader) + */ +class InternalClassLoader( + classLoader: ClassLoader = classOf[InternalClassLoader].getClassLoader +) extends ClassLoader(classLoader) { + + // TODO: Provides an exposed reference to the super loadClass to be stubbed + // out in tests. + private[magic] def parentLoadClass(name: String, resolve: Boolean) = + super.loadClass(name, resolve) + + /** + * Attempts to load the class using the local package of the builtin loader + * as the base of the name if unable to load normally. + * + * @param name The name of the class to load + * @param resolve If true, then resolve the class + * + * @return The class instance of a ClassNotFoundException + */ + override def loadClass(name: String, resolve: Boolean): Class[_] = + try { + val packageName = this.getClass.getPackage.getName + val className = name.split('.').last + + parentLoadClass(packageName + "." + className, resolve) + } catch { + case ex: ClassNotFoundException => + parentLoadClass(name, resolve) + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/LineMagic.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/LineMagic.scala b/kernel-api/src/main/scala/org/apache/toree/magic/LineMagic.scala new file mode 100644 index 0000000..0a54e85 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/LineMagic.scala @@ -0,0 +1,9 @@ +package com.ibm.spark.magic + +/** + * Line Magics perform some function and don't return anything. I.e. you cannot + * do `val x = %runMyCode 1 2 3` or alter the MIMEType of the cell. + */ +trait LineMagic extends Magic { + override def execute(code: String): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/Magic.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/Magic.scala b/kernel-api/src/main/scala/org/apache/toree/magic/Magic.scala new file mode 100644 index 0000000..0e41b35 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/Magic.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic + +/** + * Represents the base structure for a magic that is loaded and executed. + */ +trait Magic { + /** + * Execute a magic. + * @param code The code + * @return The output of the magic + */ + def execute(code: String): Any + } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/MagicExecutor.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/MagicExecutor.scala b/kernel-api/src/main/scala/org/apache/toree/magic/MagicExecutor.scala new file mode 100644 index 0000000..f74c9f6 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/MagicExecutor.scala @@ -0,0 +1,34 @@ +package com.ibm.spark.magic + +import com.ibm.spark.utils.DynamicReflectionSupport + +import scala.language.dynamics + +class MagicExecutor(magicLoader: MagicLoader) extends Dynamic { + + val executeMethod = classOf[Magic].getDeclaredMethods.head.getName + + def applyDynamic(name: String)(args: Any*): Either[CellMagicOutput, LineMagicOutput] = { + val className = magicLoader.magicClassName(name) + val isCellMagic = magicLoader.hasCellMagic(className) + val isLineMagic = magicLoader.hasLineMagic(className) + + (isCellMagic, isLineMagic) match { + case (true, false) => + val result = executeMagic(className, args) + Left(result.asInstanceOf[CellMagicOutput]) + case (false, true) => + executeMagic(className, args) + Right(LineMagicOutput) + case (_, _) => + Left(CellMagicOutput("text/plain" -> + s"Magic ${className} could not be executed.")) + } + } + + private def executeMagic(className: String, args: Seq[Any]) = { + val inst = magicLoader.createMagicInstance(className) + val dynamicSupport = new DynamicReflectionSupport(inst.getClass, inst) + dynamicSupport.applyDynamic(executeMethod)(args) + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/MagicLoader.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/MagicLoader.scala b/kernel-api/src/main/scala/org/apache/toree/magic/MagicLoader.scala new file mode 100644 index 0000000..c700c9e --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/MagicLoader.scala @@ -0,0 +1,137 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic + +import java.net.{URL, URLClassLoader} + +import com.google.common.reflect.ClassPath +import com.ibm.spark.magic.dependencies.DependencyMap + +import scala.reflect.runtime.{universe => runtimeUniverse} +import scala.collection.JavaConversions._ + +class MagicLoader( + var dependencyMap: DependencyMap = new DependencyMap(), + urls: Array[URL] = Array(), + parentLoader: ClassLoader = null +) extends URLClassLoader(urls, parentLoader) { + private val magicPackage = "com.ibm.spark.magic.builtin" + + /** + * Checks whether a magic with a given name, implementing a given interface, + * exists. + * @param name case insensitive magic name + * @param interface interface + * @return true if a magic with the given name and interface exists + */ + private def hasSpecificMagic(name: String, interface: Class[_]) : Boolean = { + val className = magicClassName(name) + try { + val clazz = loadClass(className) + clazz.getInterfaces.contains(interface) + } catch { + case _: Throwable => false + } + } + + /** + * Checks whether a line magic exists. + * @param name case insensitive line magic name + * @return true if the line magic exists + */ + def hasLineMagic(name: String): Boolean = + hasSpecificMagic(name, classOf[LineMagic]) + + /** + * Checks whether a cell magic exists. + * @param name case insensitive cell magic name + * @return true if the cell magic exists + */ + def hasCellMagic(name: String): Boolean = + hasSpecificMagic(name, classOf[CellMagic]) + + /** + * Attempts to load a class with a given name from a package. + * @param name the name of the class + * @param resolve whether to resolve the class or not + * @return the class if found + */ + override def loadClass(name: String, resolve: Boolean): Class[_] = + try { + super.loadClass(magicPackage + "." + name, resolve) + } catch { + case ex: ClassNotFoundException => + super.loadClass(name, resolve) + } + + /** + * Returns the class name for a case insensitive magic name query. + * If no match is found, returns the query. + * @param query a magic name, e.g. jAvasCRipt + * @return the queried magic name's corresponding class, e.g. JavaScript + */ + def magicClassName(query: String): String = { + lowercaseClassMap(magicClassNames).getOrElse(query.toLowerCase, query) + } + + /** + * @return list of magic class names in magicPackage. + */ + protected def magicClassNames : List[String] = { + val classPath: ClassPath = ClassPath.from(this) + val classes = classPath.getTopLevelClasses(magicPackage) + classes.asList.map(_.getSimpleName).toList + } + + /** + * @param names list of class names + * @return map of lowercase class names to class names + */ + private def lowercaseClassMap(names: List[String]): Map[String, String] = { + names.map(n => (n.toLowerCase, n)).toMap + } + + def addJar(jar: URL) = addURL(jar) + /** + * Creates a instance of the specified magic with dependencies added. + * @param name name of magic class + * @return instance of the Magic corresponding to the given name + */ + protected[magic] def createMagicInstance(name: String): Any = { + val magicClass = loadClass(name) // Checks parent loadClass first + + val runtimeMirror = runtimeUniverse.runtimeMirror(this) + val classSymbol = runtimeMirror.staticClass(magicClass.getCanonicalName) + val classMirror = runtimeMirror.reflectClass(classSymbol) + val selfType = classSymbol.selfType + + val classConstructorSymbol = + selfType.declaration(runtimeUniverse.nme.CONSTRUCTOR).asMethod + val classConstructorMethod = + classMirror.reflectConstructor(classConstructorSymbol) + + val magicInstance = classConstructorMethod() + + + // Add all of our dependencies to the new instance + dependencyMap.internalMap.filter(selfType <:< _._1).values.foreach( + _(magicInstance.asInstanceOf[Magic]) + ) + + magicInstance + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/DependencyMap.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/DependencyMap.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/DependencyMap.scala new file mode 100644 index 0000000..f641a50 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/DependencyMap.scala @@ -0,0 +1,170 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import java.io.OutputStream + +import com.ibm.spark.interpreter.Interpreter +import com.ibm.spark.kernel.api.KernelLike +import com.ibm.spark.magic.{MagicLoader, Magic} +import com.typesafe.config.Config +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext + +import scala.reflect.runtime.universe._ +import com.ibm.spark.dependencies.DependencyDownloader + +/** + * Represents a mapping of dependency types to implementations. + * + * TODO: Explore Scala macros to avoid duplicate code. + */ +class DependencyMap { + val internalMap = + scala.collection.mutable.Map[Type, PartialFunction[Magic, Unit]]() + + /** + * Sets the Interpreter for this map. + * @param interpreter The new Interpreter + */ + def setInterpreter(interpreter: Interpreter) = { + internalMap(typeOf[IncludeInterpreter]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeInterpreter].interpreter_=(interpreter) + ) + + this + } + + /** + * Sets the Interpreter for this map. + * @param interpreter The new Interpreter + */ + //@deprecated("Use setInterpreter with IncludeInterpreter!", "2015.05.06") + def setKernelInterpreter(interpreter: Interpreter) = { + internalMap(typeOf[IncludeKernelInterpreter]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeKernelInterpreter].kernelInterpreter_=(interpreter) + ) + + this + } + + /** + * Sets the SparkContext for this map. + * @param sparkContext The new SparkContext + */ + def setSparkContext(sparkContext: SparkContext) = { + internalMap(typeOf[IncludeSparkContext]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeSparkContext].sparkContext_=(sparkContext) + ) + + this + } + + /** + * Sets the SQLContext for this map. + * @param sqlContext The new SQLContext + */ + def setSQLContext(sqlContext: SQLContext) = { + internalMap(typeOf[IncludeSQLContext]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeSQLContext].sqlContext_=(sqlContext) + ) + + this + } + + /** + * Sets the OutputStream for this map. + * @param outputStream The new OutputStream + */ + def setOutputStream(outputStream: OutputStream) = { + internalMap(typeOf[IncludeOutputStream]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeOutputStream].outputStream_=(outputStream) + ) + + this + } + + /** + * Sets the DependencyDownloader for this map. + * @param dependencyDownloader The new DependencyDownloader + */ + def setDependencyDownloader(dependencyDownloader: DependencyDownloader) = { + internalMap(typeOf[IncludeDependencyDownloader]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeDependencyDownloader] + .dependencyDownloader_=(dependencyDownloader) + ) + + this + } + + /** + * Sets the Kernel Object for this map. + * @param kernel The new Kernel + */ + def setKernel(kernel: KernelLike) = { + internalMap(typeOf[IncludeKernel]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeKernel] + .kernel_=(kernel) + ) + + this + } + + /** + * Sets the MagicLoader for this map. + * @param magicLoader The new MagicLoader + */ + def setMagicLoader(magicLoader: MagicLoader) = { + internalMap(typeOf[IncludeMagicLoader]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeMagicLoader] + .magicLoader_=(magicLoader) + ) + + this + } + + /** + * Sets the Config Object for this map. + * @param config The config for the kernel + */ + def setConfig(config: Config) = { + internalMap(typeOf[IncludeConfig]) = + PartialFunction[Magic, Unit]( + magic => + magic.asInstanceOf[IncludeConfig] + .config=(config) + ) + + this + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeConfig.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeConfig.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeConfig.scala new file mode 100644 index 0000000..675c084 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeConfig.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.magic.{Magic} +import com.typesafe.config.Config + +trait IncludeConfig { + this: Magic => + + private var _config: Config = _ + def config: Config = _config + def config_= (newConfig: Config) = + _config = newConfig +} + http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeDependencyDownloader.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeDependencyDownloader.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeDependencyDownloader.scala new file mode 100644 index 0000000..109fbd1 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeDependencyDownloader.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.dependencies.DependencyDownloader +import com.ibm.spark.magic.Magic + +trait IncludeDependencyDownloader { + this: Magic => + + private var _dependencyDownloader: DependencyDownloader = _ + def dependencyDownloader: DependencyDownloader = _dependencyDownloader + def dependencyDownloader_=(newDependencyDownloader: DependencyDownloader) = + _dependencyDownloader = newDependencyDownloader +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeInterpreter.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeInterpreter.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeInterpreter.scala new file mode 100644 index 0000000..fb01131 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeInterpreter.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.interpreter.Interpreter +import com.ibm.spark.magic.Magic + +trait IncludeInterpreter { + this: Magic => + + //val interpreter: Interpreter + private var _interpreter: Interpreter = _ + def interpreter: Interpreter = _interpreter + def interpreter_=(newInterpreter: Interpreter) = + _interpreter = newInterpreter +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernel.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernel.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernel.scala new file mode 100644 index 0000000..fca3cb1 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernel.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.kernel.api.KernelLike +import com.ibm.spark.magic.Magic + +trait IncludeKernel { + this: Magic => + + private var _kernel: KernelLike = _ + def kernel: KernelLike = _kernel + def kernel_=(newKernel: KernelLike) = + _kernel = newKernel +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernelInterpreter.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernelInterpreter.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernelInterpreter.scala new file mode 100644 index 0000000..de19c07 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeKernelInterpreter.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.interpreter.Interpreter +import com.ibm.spark.magic.Magic + +//@deprecated("Use IncludeInterpreter instead!", "2015.05.06") +trait IncludeKernelInterpreter { + this: Magic => + + //val interpreter: Interpreter + private var _interpreter: Interpreter = _ + def kernelInterpreter: Interpreter = _interpreter + def kernelInterpreter_=(newInterpreter: Interpreter) = + _interpreter = newInterpreter +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeMagicLoader.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeMagicLoader.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeMagicLoader.scala new file mode 100644 index 0000000..0a78508 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeMagicLoader.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.magic.{MagicLoader, Magic} + + +trait IncludeMagicLoader { + this: Magic => + + //val sparkContext: SparkContext + private var _magicLoader: MagicLoader = _ + def magicLoader: MagicLoader = _magicLoader + def magicLoader_=(newMagicLoader: MagicLoader) = + _magicLoader = newMagicLoader +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeOutputStream.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeOutputStream.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeOutputStream.scala new file mode 100644 index 0000000..a3e679e --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeOutputStream.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import java.io.OutputStream + +import com.ibm.spark.magic.Magic + +trait IncludeOutputStream { + this: Magic => + + //val outputStream: OutputStream + private var _outputStream: OutputStream = _ + def outputStream: OutputStream = _outputStream + def outputStream_=(newOutputStream: OutputStream) = + _outputStream = newOutputStream +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSQLContext.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSQLContext.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSQLContext.scala new file mode 100644 index 0000000..5a9b26c --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSQLContext.scala @@ -0,0 +1,13 @@ +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.magic.Magic +import org.apache.spark.sql.SQLContext + +trait IncludeSQLContext { + this: Magic => + + private var _sqlContext: SQLContext = _ + def sqlContext: SQLContext = _sqlContext + def sqlContext_=(newSqlContext: SQLContext) = + _sqlContext = newSqlContext +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala new file mode 100644 index 0000000..df5e245 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/dependencies/IncludeSparkContext.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.dependencies + +import com.ibm.spark.magic.Magic +import org.apache.spark.SparkContext + +trait IncludeSparkContext { + this: Magic => + + //val sparkContext: SparkContext + private var _sparkContext: SparkContext = _ + def sparkContext: SparkContext = _sparkContext + def sparkContext_=(newSparkContext: SparkContext) = + _sparkContext = newSparkContext +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/magic/package.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/org/apache/toree/magic/package.scala b/kernel-api/src/main/scala/org/apache/toree/magic/package.scala new file mode 100644 index 0000000..292d641 --- /dev/null +++ b/kernel-api/src/main/scala/org/apache/toree/magic/package.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark + +package object magic { + /** + * Represents the output of a magic execution. + */ + // TODO: This is a duplicate of Data in kernel protocol, needs to be given + // a type/val that can be translated into a specific protocol via + // implicits - or some other transformation - to separate this from + // the protocol type + type CellMagicOutput = Map[String, String] + val CellMagicOutput = Map + + type LineMagicOutput = Unit + val LineMagicOutput : LineMagicOutput = () +}