Repository: incubator-toree Updated Branches: refs/heads/master 31b2039ee -> 4463bf460
[TOREE 278]: Replaced RDD Magic With DataFrame Added DataFrame magic to replace RDD magic Added support to output DataFrames in different types Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/5a2b79e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/5a2b79e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/5a2b79e2 Branch: refs/heads/master Commit: 5a2b79e25e75f24c64780361147aeae40bc13f47 Parents: 31b2039 Author: Corey A. Stubbs <cstu...@us.ibm.com> Authored: Wed May 4 15:06:00 2016 -0500 Committer: Corey A. Stubbs <cstu...@us.ibm.com> Committed: Fri May 6 12:58:36 2016 -0500 ---------------------------------------------------------------------- etc/examples/notebooks/magic-tutorial.ipynb | 156 +++++++++++++++ .../apache/toree/magic/builtin/Dataframe.scala | 149 ++++++++++++++ .../org/apache/toree/magic/builtin/RDD.scala | 64 ------ .../apache/toree/utils/DataFrameConverter.scala | 76 +++++++ .../org/apache/toree/utils/json/RddToJson.scala | 42 ---- .../toree/magic/builtin/DataFrameSpec.scala | 196 +++++++++++++++++++ .../apache/toree/magic/builtin/RDDSpec.scala | 118 ----------- .../toree/utils/DataFrameConverterSpec.scala | 79 ++++++++ .../apache/toree/utils/json/RddToJsonSpec.scala | 56 ------ 9 files changed, 656 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/etc/examples/notebooks/magic-tutorial.ipynb ---------------------------------------------------------------------- diff --git a/etc/examples/notebooks/magic-tutorial.ipynb b/etc/examples/notebooks/magic-tutorial.ipynb index 8725868..3d3ca28 100644 --- a/etc/examples/notebooks/magic-tutorial.ipynb +++ b/etc/examples/notebooks/magic-tutorial.ipynb @@ -514,6 +514,162 @@ }, { "cell_type": "markdown", + "metadata": {}, + "source": [ + "### %%DataFrame\n", + "The `%%DataFrame` magic is used to convert a Spark SQL DataFrame into various formats. Currently, `json`, `html`, and `csv` are supported. The magic takes an expression, which evauluates to a dataframe, to perform the conversion. So, we first need to create a DataFrame object for reference." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "case class DFRecord(key: String, value: Int)\n", + "val sqlc = sqlContext\n", + "import sqlc.implicits._\n", + "val df = sc.parallelize(1 to 10).map(x => DFRecord(x.toString, x)).toDF()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The default output is `html`" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/plain": [ + "%%dataframe [arguments]\n", + "DATAFRAME_CODE\n", + "\n", + "DATAGRAME_CODE can be any numbered lines of code, as long as the\n", + "last line is a reference to a variable which is a DataFrame.\n", + " Option Description \n", + "------ ----------- \n", + "--help Displays the help and usage text for \n", + " this magic. \n", + "--limit The type of the output: html \n", + " (default), csv, json (default: 10) \n", + "--output The type of the output: html \n", + " (default), csv, json (default: html)\n" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/html": [ + "<table><tr><th>key</th><th>value</th></tr><tr><td>1</td><td>1</td></tr><tr><td>2</td><td>2</td></tr><tr><td>3</td><td>3</td></tr><tr><td>4</td><td>4</td></tr><tr><td>5</td><td>5</td></tr><tr><td>6</td><td>6</td></tr><tr><td>7</td><td>7</td></tr><tr><td>8</td><td>8</td></tr><tr><td>9</td><td>9</td></tr><tr><td>10</td><td>10</td></tr></table>" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%dataframe\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can specify the `--output` argument to change the output type." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/plain": [ + "key,value\n", + "1,1\n", + "2,2\n", + "3,3\n", + "4,4\n", + "5,5\n", + "6,6\n", + "7,7\n", + "8,8\n", + "9,9\n", + "10,10" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%dataframe --output=csv\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "There is also an option to limit the number of records returned." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/html": [ + "<table><tr><th>key</th><th>value</th></tr><tr><td>1</td><td>1</td></tr><tr><td>2</td><td>2</td></tr><tr><td>3</td><td>3</td></tr></table>" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%dataframe --limit=3\n", + "df" + ] + }, + { + "cell_type": "markdown", "metadata": { "collapsed": true }, http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala new file mode 100644 index 0000000..3f00f82 --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.toree.magic.builtin + +import java.io.{PrintStream, StringWriter} + +import org.apache.toree.interpreter.{ExecuteAborted, ExecuteError, ExecuteFailure, Results} +import org.apache.toree.kernel.protocol.v5._ +import org.apache.toree.magic._ +import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeOutputStream} +import org.apache.toree.plugins.annotations.{Event, Init} +import org.apache.toree.utils.{ArgumentParsingSupport, DataFrameConverter, LogLike} + +import scala.util.Try + + +class DFConversionException extends Exception{} + + +object DataFrameResponses { + val MagicAborted = s"${classOf[DataFrame].getSimpleName} magic aborted!" + + def ErrorMessage(outputType: String, error: String) = { + s"An error occurred converting DataFrame to ${outputType}.\n${error}" + } + + def NoVariableFound(name: String) = { + s"No variable found with the name ${name}!" + } + + val Incomplete = "DataFrame code was an incomplete code snippet" + + val Usage = + """%%dataframe [arguments] + |DATAFRAME_CODE + | + |DATAFRAME_CODE can be any numbered lines of code, as long as the + |last line is a reference to a variable which is a DataFrame. + """.stripMargin +} + +class DataFrame extends CellMagic with IncludeKernelInterpreter + with IncludeOutputStream with ArgumentParsingSupport with LogLike { + private var _dataFrameConverter: DataFrameConverter = _ + private val outputTypeMap = Map[String, String]( + "html" -> MIMEType.TextHtml, + "csv" -> MIMEType.PlainText, + "json" -> MIMEType.ApplicationJson + ) + + @Init def initMethod(dataFrameConverter: DataFrameConverter) = { + _dataFrameConverter = dataFrameConverter + } + private def printStream = new PrintStream(outputStream) + + private val _outputType = parser.accepts( + "output", "The type of the output: html, csv, json" + ).withRequiredArg().defaultsTo("html") + + private val _limit = parser.accepts( + "limit", "The number of records to return" + ).withRequiredArg().defaultsTo("10") + + private def outputType(): String = { + _outputType.getOrElse("html") + } + private def limit(): Int = { + _limit.getOrElse("10").toInt + } + + private def outputTypeToMimeType(): String = { + outputTypeMap.getOrElse(outputType, MIMEType.PlainText) + } + + private def convertToJson(rddCode: String): CellMagicOutput = { + val (result, message) = kernelInterpreter.interpret(rddCode) + result match { + case Results.Success => + val rddVarName = kernelInterpreter.lastExecutionVariableName.get + kernelInterpreter.read(rddVarName).map(variableVal => { + _dataFrameConverter.convert( + variableVal.asInstanceOf[org.apache.spark.sql.DataFrame], + outputType, + limit + ).map(output => + CellMagicOutput(outputTypeToMimeType -> output) + ).get + }).getOrElse(CellMagicOutput(MIMEType.PlainText -> DataFrameResponses.NoVariableFound(rddVarName))) + case Results.Aborted => + logger.error(DataFrameResponses.ErrorMessage(outputType, DataFrameResponses.MagicAborted)) + CellMagicOutput( + MIMEType.PlainText -> DataFrameResponses.ErrorMessage(outputType, DataFrameResponses.MagicAborted) + ) + case Results.Error => + val error = message.right.get.asInstanceOf[ExecuteError] + val errorMessage = DataFrameResponses.ErrorMessage(outputType, error.value) + logger.error(errorMessage) + CellMagicOutput(MIMEType.PlainText -> errorMessage) + case Results.Incomplete => + logger.error(DataFrameResponses.Incomplete) + CellMagicOutput(MIMEType.PlainText -> DataFrameResponses.Incomplete) + } + } + + private def helpToCellMagicOutput(optionalException: Option[Exception] = None): CellMagicOutput = { + val stringWriter = new StringWriter() + stringWriter.append(optionalException.map(e => { + s"ERROR: ${e.getMessage}\n" + }).getOrElse("")) + stringWriter.write(DataFrameResponses.Usage) + parser.printHelpOn(stringWriter) + CellMagicOutput(MIMEType.PlainText -> stringWriter.toString) + } + + @Event(name = "dataframe") + override def execute(code: String): CellMagicOutput = { + val lines = code.trim.split("\n") + Try({ + val res: CellMagicOutput = if (lines.length == 1 && lines.head.length == 0){ + helpToCellMagicOutput() + } else if (lines.length == 1) { + parseArgs("") + convertToJson(lines.head) + } else { + parseArgs(lines.head) + convertToJson(lines.drop(1).reduce(_ + _)) + } + res + }).recover({ + case e: Exception => + helpToCellMagicOutput(Some(e)) + }).get + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala deleted file mode 100644 index b165ac8..0000000 --- a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.magic.builtin - -import org.apache.toree.interpreter.{ExecuteFailure, Results, ExecuteAborted, ExecuteError} -import org.apache.toree.kernel.protocol.v5.MIMEType -import org.apache.toree.magic._ -import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter} -import org.apache.toree.utils.LogLike -import org.apache.toree.utils.json.RddToJson -import org.apache.spark.sql.SchemaRDD -import org.apache.toree.plugins.annotations.Event - -/** - * Temporary magic to show an RDD as JSON - */ -class RDD extends CellMagic with IncludeKernelInterpreter with LogLike { - - private def convertToJson(code: String) = { - val (result, message) = kernelInterpreter.interpret(code) - result match { - case Results.Success => - val rddVarName = kernelInterpreter.lastExecutionVariableName.getOrElse("") - kernelInterpreter.read(rddVarName).map(rddVal => { - try{ - CellMagicOutput(MIMEType.ApplicationJson -> RddToJson.convert(rddVal.asInstanceOf[SchemaRDD])) - } catch { - case _: Throwable => - CellMagicOutput(MIMEType.PlainText -> s"Could note convert RDD to JSON: ${rddVarName}->${rddVal}") - } - }).getOrElse(CellMagicOutput(MIMEType.PlainText -> "No RDD Value found!")) - case _ => - val errorMessage = message.right.toOption match { - case Some(executeFailure) => executeFailure match { - case _: ExecuteAborted => throw new Exception("RDD magic aborted!") - case executeError: ExecuteError => throw new Exception(executeError.value) - } - case _ => "No error information available!" - } - logger.error(s"Error retrieving RDD value: ${errorMessage}") - CellMagicOutput(MIMEType.PlainText -> - (s"An error occurred converting RDD to JSON.\n${errorMessage}")) - } - } - - @Event(name = "rdd") - override def execute(code: String): CellMagicOutput = - convertToJson(code) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala b/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala new file mode 100644 index 0000000..b28879e --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.toree.utils + +import org.apache.spark.sql.DataFrame +import org.apache.toree.plugins.Plugin +import play.api.libs.json.{JsObject, Json} + +import scala.util.{Failure, Try} +import org.apache.toree.plugins.annotations.Init + +class DataFrameConverter extends Plugin with LogLike { + @Init def init() = { + register(this) + } + + def convert( + df: DataFrame, outputType: String, limit: Int = 10 + ): Try[String] = { + Try( + outputType.toLowerCase() match { + case "html" => + convertToHtml(df = df, limit = limit) + case "json" => + convertToJson(df = df, limit = limit) + case "csv" => + convertToCsv(df = df, limit = limit) + } + ) + } + + private def convertToHtml(df: DataFrame, limit: Int = 10): String = { + val columnFields = df.schema.fieldNames.map(columnName => { + s"<th>${columnName}</th>" + }).reduce(_ + _) + val columns = s"<tr>${columnFields}</tr>" + val rows = df.map(row => { + val fieldValues = row.toSeq.map(field => { + s"<td>${field.toString}</td>" + }).reduce(_ + _) + s"<tr>${fieldValues}</tr>" + }).take(limit).reduce(_ + _) + s"<table>${columns}${rows}</table>" + } + + private def convertToJson(df: DataFrame, limit: Int = 10): String = { + JsObject(Seq( + "columns" -> Json.toJson(df.schema.fieldNames), + "rows" -> Json.toJson(df.map(row => + row.toSeq.map(_.toString).toArray).take(limit)) + )).toString() + } + + private def convertToCsv(df: DataFrame, limit: Int = 10): String = { + val headers = df.schema.fieldNames.reduce(_ + "," + _) + val rows = df.map(row => { + row.toSeq.map(field => field.toString).reduce(_ + "," + _) + }).take(limit).reduce(_ + "\n" + _) + s"${headers}\n${rows}" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala b/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala deleted file mode 100644 index d3c25fb..0000000 --- a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.utils.json - -import org.apache.spark.sql.{DataFrame, SchemaRDD} -import play.api.libs.json.{JsObject, JsString, Json} - -/** - * Utility to convert RDD to JSON. - */ -object RddToJson { - - /** - * Converts a SchemaRDD to a JSON table format. - * - * @param rdd The schema rdd (now a dataframe) to convert - * - * @return The resulting string representing the JSON - */ - def convert(rdd: DataFrame, limit: Int = 10): String = - JsObject(Seq( - "type" -> JsString("rdd/schema"), - "columns" -> Json.toJson(rdd.schema.fieldNames), - "rows" -> Json.toJson(rdd.map(row => - row.toSeq.map(_.toString).toArray).take(limit)) - )).toString() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala new file mode 100644 index 0000000..e6c011f --- /dev/null +++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.toree.magic.builtin + +import org.apache.toree.interpreter._ +import org.apache.toree.kernel.protocol.v5.MIMEType +import org.apache.toree.magic.dependencies.IncludeKernelInterpreter +import org.apache.toree.utils.DataFrameConverter +import org.mockito.Matchers._ +import org.mockito.Matchers.{eq => mockEq} +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} +import scala.util.Success + +class DataFrameSpec extends FunSpec with Matchers with MockitoSugar with BeforeAndAfter { + + def createMocks = { + val interpreter = mock[Interpreter] + val converter = mock[DataFrameConverter] + val magic = new DataFrame with IncludeKernelInterpreter { + override val kernelInterpreter: Interpreter = interpreter + } + magic.initMethod(converter) + (magic, interpreter, converter) + } + + describe("DataFrame") { + describe("#execute") { + it("should return a plain text error message on aborted execution"){ + val (magic, interpreter, _) = createMocks + val message: Either[ExecuteOutput, ExecuteFailure] = Right(mock[ExecuteAborted]) + val code = "code" + doReturn((Results.Aborted,message)).when(interpreter).interpret(code, false) + val output = magic.execute(code) + output.contains(MIMEType.PlainText) should be(true) + output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage( + "html", + DataFrameResponses.MagicAborted + )) + } + + it("should return a plain text error message on execution errors"){ + val (magic, interpreter, _) = createMocks + val mockExecuteError = mock[ExecuteError] + val mockError = "error" + doReturn(mockError).when(mockExecuteError).value + val message: Either[ExecuteOutput, ExecuteFailure] = Right(mockExecuteError) + val code = "code" + doReturn((Results.Error,message)).when(interpreter).interpret(code, false) + val output = magic.execute(code) + output.contains(MIMEType.PlainText) should be(true) + output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage( + "html", + mockError + )) + } + + it("should return a plain text message when there is no variable reference"){ + val (magic, interpreter, _) = createMocks + val mockExecuteError = mock[ExecuteError] + val mockError = "error" + doReturn(mockError).when(mockExecuteError).value + val message: Either[ExecuteOutput, ExecuteFailure] = Right(mockExecuteError ) + val code = "code" + doReturn((Results.Error,message)).when(interpreter).interpret(code, false) + val output = magic.execute(code) + output.contains(MIMEType.PlainText) should be(true) + output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage( + "html", + mockError + )) + } + + it("should return a plain text message with help when there are no args"){ + val (magic, _, _) = createMocks + val code = "" + val output = magic.execute(code) + output.contains(MIMEType.PlainText) should be(true) + output(MIMEType.PlainText).contains(DataFrameResponses.Usage) should be(true) + } + + it("should return a json message when json is the selected output"){ + val (magic, interpreter, converter) = createMocks + val outputText = "test output" + val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText) + val mockDataFrame = mock[org.apache.spark.sql.DataFrame] + val variableName = "variable" + val executeCode =s"""--output=json + |${variableName} + """.stripMargin + doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false) + doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName + doReturn(Some(mockDataFrame)).when(interpreter).read(variableName) + doReturn(Success(outputText)).when(converter).convert( + mockDataFrame,"json", 10 + ) + val output = magic.execute(executeCode) + output.contains(MIMEType.ApplicationJson) should be(true) + output(MIMEType.ApplicationJson).contains(outputText) should be(true) + } + + it("should return an html message when html is the selected output"){ + val (magic, interpreter, converter) = createMocks + val outputText = "test output" + val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText) + val mockDataFrame = mock[org.apache.spark.sql.DataFrame] + val variableName = "variable" + val executeCode =s"""--output=html + |${variableName} + """.stripMargin + doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false) + doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName + doReturn(Some(mockDataFrame)).when(interpreter).read(variableName) + doReturn(Success(outputText)).when(converter).convert( + mockDataFrame,"html", 10 + ) + val output = magic.execute(executeCode) + output.contains(MIMEType.TextHtml) should be(true) + output(MIMEType.TextHtml).contains(outputText) should be(true) + } + + it("should return a csv message when csv is the selected output"){ + val (magic, interpreter, converter) = createMocks + val outputText = "test output" + val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText) + val mockDataFrame = mock[org.apache.spark.sql.DataFrame] + val variableName = "variable" + val executeCode =s"""--output=csv + |${variableName} + """.stripMargin + doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false) + doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName + doReturn(Some(mockDataFrame)).when(interpreter).read(variableName) + doReturn(Success(outputText)).when(converter).convert( + mockDataFrame,"csv", 10 + ) + val output = magic.execute(executeCode) + output.contains(MIMEType.PlainText) should be(true) + output(MIMEType.PlainText).contains(outputText) should be(true) + } + + it("should pass the limit argument to the converter"){ + val (magic, interpreter, converter) = createMocks + val outputText = "test output" + val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText) + val mockDataFrame = mock[org.apache.spark.sql.DataFrame] + val variableName = "variable" + val executeCode =s"""--output=html --limit=3 + |${variableName} + """.stripMargin + doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false) + doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName + doReturn(Some(mockDataFrame)).when(interpreter).read(variableName) + doReturn(Success(outputText)).when(converter).convert( + mockDataFrame,"html", 3 + ) + magic.execute(executeCode) + verify(converter).convert(any(), anyString(), mockEq(3)) + } + + it("should return a plain text message with help when the converter throws an exception"){ + val (magic, interpreter, converter) = createMocks + val outputText = "test output" + val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText) + val mockDataFrame = mock[org.apache.spark.sql.DataFrame] + val code = "variable" + doReturn((Results.Success,message)).when(interpreter).interpret(code, false) + doReturn(Some(code)).when(interpreter).lastExecutionVariableName + doReturn(Some(mockDataFrame)).when(interpreter).read(code) + doThrow(new RuntimeException()).when(converter).convert( + mockDataFrame,"html", 10 + ) + val output = magic.execute(code) + output.contains(MIMEType.PlainText) should be(true) + output(MIMEType.PlainText).contains(DataFrameResponses.Usage) should be(true) + + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala deleted file mode 100644 index 3062036..0000000 --- a/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.magic.builtin - -import org.apache.toree.interpreter.Results.Result -import org.apache.toree.interpreter.{Results, ExecuteAborted, ExecuteError, Interpreter} -import org.apache.toree.kernel.protocol.v5.MIMEType -import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter} -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.StructType -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} -import play.api.libs.json.Json - -class RDDSpec extends FunSpec with Matchers with MockitoSugar with BeforeAndAfter { - - val resOutput = "res1: org.apache.spark.sql.SchemaRDD =" - - val mockInterpreter = mock[Interpreter] - val mockDataFrame = mock[DataFrame] - val mockRdd = mock[org.apache.spark.rdd.RDD[Any]] - val mockStruct = mock[StructType] - val columns = Seq("foo", "bar").toArray - val rows = Array( Array("a", "b"), Array("c", "d") ) - - doReturn(mockStruct).when(mockDataFrame).schema - doReturn(columns).when(mockStruct).fieldNames - doReturn(mockRdd).when(mockDataFrame).map(any())(any()) - doReturn(rows).when(mockRdd).take(anyInt()) - - val rddMagic = new RDD with IncludeKernelInterpreter { - override val kernelInterpreter: Interpreter = mockInterpreter - } - - before { - doReturn(Some("someRDD")).when(mockInterpreter).lastExecutionVariableName - doReturn(Some(mockDataFrame)).when(mockInterpreter).read(anyString()) - doReturn((Results.Success, Left(resOutput))) - .when(mockInterpreter).interpret(anyString(), anyBoolean()) - } - - describe("RDD") { - describe("#execute") { - it("should return valid JSON when the executed code evaluates to a " + - "SchemaRDD") { - val magicOutput = rddMagic.execute("schemaRDD") - magicOutput.contains(MIMEType.ApplicationJson) should be (true) - Json.parse(magicOutput(MIMEType.ApplicationJson)) - } - - it("should return normally when the executed code does not evaluate to " + - "a SchemaRDD") { - doReturn((mock[Result], Left("foo"))).when(mockInterpreter) - .interpret(anyString(), anyBoolean()) - val magicOutput = rddMagic.execute("") - magicOutput.contains(MIMEType.PlainText) should be (true) - } - - it("should return error message when the interpreter does not return " + - "SchemaRDD as expected") { - doReturn(Some("foo")).when(mockInterpreter).read(anyString()) - val magicOutput = rddMagic.execute("") - magicOutput.contains(MIMEType.PlainText) should be (true) - } - - it("should throw a Throwable if the interpreter returns an ExecuteError"){ - val expected = "some error message" - val mockExecuteError = mock[ExecuteError] - doReturn(expected).when(mockExecuteError).value - - doReturn((mock[Result], Right(mockExecuteError))).when(mockInterpreter) - .interpret(anyString(), anyBoolean()) - val actual = { - val exception = intercept[Throwable] { - rddMagic.execute("") - } - exception.getLocalizedMessage - } - - actual should be (expected) - } - - it("should throw a Throwable if the interpreter returns an " + - "ExecuteAborted") { - val expected = "RDD magic aborted!" - val mockExecuteAborted = mock[ExecuteAborted] - - doReturn((mock[Result], Right(mockExecuteAborted))) - .when(mockInterpreter).interpret(anyString(), anyBoolean()) - val actual = { - val exception = intercept[Throwable] { - rddMagic.execute("") - } - exception.getLocalizedMessage - } - - actual should be (expected) - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala b/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala new file mode 100644 index 0000000..d45ecce --- /dev/null +++ b/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.toree.utils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.StructType +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSpec, Matchers} +import play.api.libs.json.{JsArray, JsString, Json} + +class DataFrameConverterSpec extends FunSpec with MockitoSugar with Matchers { + val dataFrameConverter: DataFrameConverter = new DataFrameConverter + val mockDataFrame = mock[DataFrame] + val mockRdd = mock[RDD[Any]] + val mockStruct = mock[StructType] + val columns = Seq("foo", "bar").toArray + val rowsOfArrays = Array( Array("a", "b"), Array("c", "d") ) + val rowsOfStrings = Array("test1","test2") + val rowsOfString = Array("test1") + + doReturn(mockStruct).when(mockDataFrame).schema + doReturn(columns).when(mockStruct).fieldNames + doReturn(mockRdd).when(mockDataFrame).map(any())(any()) + doReturn(rowsOfArrays).when(mockRdd).take(anyInt()) + + describe("DataFrameConverter") { + describe("#convert") { + it("should convert to a valid JSON object") { + val someJson = dataFrameConverter.convert(mockDataFrame, "json") + val jsValue = Json.parse(someJson.get) + jsValue \ "columns" should be (JsArray(Seq(JsString("foo"), JsString("bar")))) + jsValue \ "rows" should be (JsArray(Seq( + JsArray(Seq(JsString("a"), JsString("b"))), + JsArray(Seq(JsString("c"), JsString("d"))))) + ) + } + it("should convert to csv") { + doReturn(rowsOfStrings).when(mockRdd).take(anyInt()) + val csv = dataFrameConverter.convert(mockDataFrame, "csv").get + val values = csv.split("\n").map(_.split(",")) + values(0) should contain allOf ("foo","bar") + } + it("should convert to html") { + doReturn(rowsOfStrings).when(mockRdd).take(anyInt()) + val html = dataFrameConverter.convert(mockDataFrame, "html").get + html.contains("<th>foo</th>") should be(true) + html.contains("<th>bar</th>") should be(true) + } + it("should convert limit the selection") { + doReturn(rowsOfString).when(mockRdd).take(1) + val someLimited = dataFrameConverter.convert(mockDataFrame, "csv", 1) + val limitedLines = someLimited.get.split("\n") + limitedLines.length should be(2) + } + it("should return a Failure for invalid types") { + val result = dataFrameConverter.convert(mockDataFrame, "Invalid Type") + result.isFailure should be(true) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala b/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala deleted file mode 100644 index e2b403e..0000000 --- a/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.utils.json - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.StructType -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, FunSpec} -import org.mockito.Mockito._ -import org.mockito.Matchers._ -import play.api.libs.json.{JsArray, JsString, Json} - -class RddToJsonSpec extends FunSpec with MockitoSugar with Matchers { - - val mockDataFrame = mock[DataFrame] - val mockRdd = mock[RDD[Any]] - val mockStruct = mock[StructType] - val columns = Seq("foo", "bar").toArray - val rows = Array( Array("a", "b"), Array("c", "d") ) - - doReturn(mockStruct).when(mockDataFrame).schema - doReturn(columns).when(mockStruct).fieldNames - doReturn(mockRdd).when(mockDataFrame).map(any())(any()) - doReturn(rows).when(mockRdd).take(anyInt()) - - describe("RddToJson") { - describe("#convert(SchemaRDD)") { - it("should convert to valid JSON object") { - - val json = RddToJson.convert(mockDataFrame) - val jsValue = Json.parse(json) - - jsValue \ "columns" should be (JsArray(Seq(JsString("foo"), JsString("bar")))) - jsValue \ "rows" should be (JsArray(Seq( - JsArray(Seq(JsString("a"), JsString("b"))), - JsArray(Seq(JsString("c"), JsString("d")))))) - } - } - } -}