Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master b51af33cb -> d5e87fb8b


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/main/scala/scala/Console.scala
----------------------------------------------------------------------
diff --git a/r/src/main/scala/scala/Console.scala 
b/r/src/main/scala/scala/Console.scala
new file mode 100644
index 0000000..6b8f93c
--- /dev/null
+++ b/r/src/main/scala/scala/Console.scala
@@ -0,0 +1,491 @@
+/*                     __                                               *\
+Copyright (c) 2002-2016 EPFL
+Copyright (c) 2011-2016 Lightbend, Inc. (formerly Typesafe, Inc.)
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without 
modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this 
list of conditions and the following disclaimer.
+Redistributions in binary form must reproduce the above copyright notice, this 
list of conditions and the following disclaimer in the documentation and/or 
other materials provided with the distribution.
+Neither the name of the EPFL nor the names of its contributors may be used to 
endorse or promote products derived from this software without specific prior 
written permission.
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS 
IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
IMPLIED WARRANTIES OF M MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE 
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+
+package scala
+
+import java.io.{BufferedReader, InputStream, InputStreamReader,
+                IOException, OutputStream, PrintStream, Reader}
+import java.text.MessageFormat
+import scala.util.DynamicVariable
+
+
+/** Implements functionality for
+ *  printing Scala values on the terminal as well as reading specific values.
+ *  Also defines constants for marking up text on ANSI terminals.
+ *
+ *  @author  Matthias Zenger
+ *  @version 1.0, 03/09/2003
+ */
+object Console {
+
+  /** Foreground color for ANSI black */
+  final val BLACK      = "\033[30m"
+  /** Foreground color for ANSI red */
+  final val RED        = "\033[31m"
+  /** Foreground color for ANSI green */
+  final val GREEN      = "\033[32m"
+  /** Foreground color for ANSI yellow */
+  final val YELLOW     = "\033[33m"
+  /** Foreground color for ANSI blue */
+  final val BLUE       = "\033[34m"
+  /** Foreground color for ANSI magenta */
+  final val MAGENTA    = "\033[35m"
+  /** Foreground color for ANSI cyan */
+  final val CYAN       = "\033[36m"
+  /** Foreground color for ANSI white */
+  final val WHITE      = "\033[37m"
+
+  /** Background color for ANSI black */
+  final val BLACK_B    = "\033[40m"
+  /** Background color for ANSI red */
+  final val RED_B      = "\033[41m"
+  /** Background color for ANSI green */
+  final val GREEN_B    = "\033[42m"
+  /** Background color for ANSI yellow */
+  final val YELLOW_B   = "\033[43m"
+  /** Background color for ANSI blue */
+  final val BLUE_B     = "\033[44m"
+  /** Background color for ANSI magenta */
+  final val MAGENTA_B  = "\033[45m"
+  /** Background color for ANSI cyan */
+  final val CYAN_B     = "\033[46m"
+  /** Background color for ANSI white */
+  final val WHITE_B    = "\033[47m"
+
+  /** Reset ANSI styles */
+  final val RESET      = "\033[0m"
+  /** ANSI bold */
+  final val BOLD       = "\033[1m"
+  /** ANSI underlines */
+  final val UNDERLINED = "\033[4m"
+  /** ANSI blink */
+  final val BLINK      = "\033[5m"
+  /** ANSI reversed */
+  final val REVERSED   = "\033[7m"
+  /** ANSI invisible */
+  final val INVISIBLE  = "\033[8m"
+
+  // From Scala 2.10.5
+  // Start of rscala patch which only takes effect if RSCALA_TUNNELING 
environment variable is TRUE.
+  val baosOut = new java.io.ByteArrayOutputStream()
+  val baosErr = new java.io.ByteArrayOutputStream()
+  val psOut = new java.io.PrintStream(baosOut,true)
+  val psErr = new java.io.PrintStream(baosErr,true)
+  val originalOut = java.lang.System.out
+  val originalErr = java.lang.System.err
+  try {
+    if ( sys.env("RSCALA_TUNNELING") == "TRUE" ) {
+      java.lang.System.setOut(psOut)
+      java.lang.System.setErr(psErr)
+    }
+  } catch {
+    case _: Throwable =>
+  }
+  // End of rscala patch.
+
+  private val outVar = new DynamicVariable[PrintStream](java.lang.System.out)
+  private val errVar = new DynamicVariable[PrintStream](java.lang.System.err)
+  private val inVar = new DynamicVariable[BufferedReader](
+    new BufferedReader(new InputStreamReader(java.lang.System.in)))
+
+  /** The default output, can be overridden by `setOut` */
+  def out = outVar.value
+  /** The default error, can be overridden by `setErr` */
+  def err = errVar.value
+  /** The default input, can be overridden by `setIn` */
+  def in = inVar.value
+
+  /** Sets the default output stream.
+   *
+   *  @param out the new output stream.
+   */
+  def setOut(out: PrintStream) { outVar.value = out }
+
+  /** Sets the default output stream for the duration
+   *  of execution of one thunk.
+   *
+   *  @example {{{
+   *  withOut(Console.err) { println("This goes to default _error_") }
+   *  }}}
+   *
+   *  @param out the new output stream.
+   *  @param thunk the code to execute with
+   *               the new output stream active
+   *  @return the results of `thunk`
+   *  @see `withOut[T](out:OutputStream)(thunk: => T)`
+   */
+  def withOut[T](out: PrintStream)(thunk: =>T): T =
+    outVar.withValue(out)(thunk)
+
+  /** Sets the default output stream.
+   *
+   *  @param out the new output stream.
+   */
+  def setOut(out: OutputStream): Unit =
+    setOut(new PrintStream(out))
+
+  /** Sets the default output stream for the duration
+   *  of execution of one thunk.
+   *
+   *  @param out the new output stream.
+   *  @param thunk the code to execute with
+   *               the new output stream active
+   *  @return the results of `thunk`
+   *  @see `withOut[T](out:PrintStream)(thunk: => T)`
+   */
+  def withOut[T](out: OutputStream)(thunk: =>T): T =
+    withOut(new PrintStream(out))(thunk)
+
+
+  /** Sets the default error stream.
+   *
+   *  @param err the new error stream.
+   */
+  def setErr(err: PrintStream) { errVar.value = err }
+
+  /** Set the default error stream for the duration
+   *  of execution of one thunk.
+   *  @example {{{
+   *  withErr(Console.out) { println("This goes to default _out_") }
+   *  }}}
+   *
+   *  @param err the new error stream.
+   *  @param thunk the code to execute with
+   *               the new error stream active
+   *  @return the results of `thunk`
+   *  @see `withErr[T](err:OutputStream)(thunk: =>T)`
+   */
+  def withErr[T](err: PrintStream)(thunk: =>T): T =
+    errVar.withValue(err)(thunk)
+
+  /** Sets the default error stream.
+   *
+   *  @param err the new error stream.
+   */
+  def setErr(err: OutputStream): Unit =
+    setErr(new PrintStream(err))
+
+  /** Sets the default error stream for the duration
+   *  of execution of one thunk.
+   *
+   *  @param err the new error stream.
+   *  @param thunk the code to execute with
+   *               the new error stream active
+   *  @return the results of `thunk`
+   *  @see `withErr[T](err:PrintStream)(thunk: =>T)`
+   */
+  def withErr[T](err: OutputStream)(thunk: =>T): T =
+    withErr(new PrintStream(err))(thunk)
+
+
+  /** Sets the default input stream.
+   *
+   *  @param reader specifies the new input stream.
+   */
+  def setIn(reader: Reader) {
+    inVar.value = new BufferedReader(reader)
+  }
+
+  /** Sets the default input stream for the duration
+   *  of execution of one thunk.
+   *
+   *  @example {{{
+   *  val someFile:Reader = openFile("file.txt")
+   *  withIn(someFile) {
+   *    // Reads a line from file.txt instead of default input
+   *    println(readLine)
+   *  }
+   *  }}}
+   *
+   *  @param thunk the code to execute with
+   *               the new input stream active
+   *
+   * @return the results of `thunk`
+   * @see `withIn[T](in:InputStream)(thunk: =>T)`
+   */
+  def withIn[T](reader: Reader)(thunk: =>T): T =
+    inVar.withValue(new BufferedReader(reader))(thunk)
+
+  /** Sets the default input stream.
+   *
+   *  @param in the new input stream.
+   */
+  def setIn(in: InputStream) {
+    setIn(new InputStreamReader(in))
+  }
+
+  /** Sets the default input stream for the duration
+   *  of execution of one thunk.
+   *
+   *  @param in the new input stream.
+   *  @param thunk the code to execute with
+   *               the new input stream active
+   * @return the results of `thunk`
+   * @see `withIn[T](reader:Reader)(thunk: =>T)`
+   */
+  def withIn[T](in: InputStream)(thunk: =>T): T =
+    withIn(new InputStreamReader(in))(thunk)
+
+  /** Prints an object to `out` using its `toString` method.
+   *
+   *  @param obj the object to print; may be null.
+   */
+  def print(obj: Any) {
+    out.print(if (null == obj) "null" else obj.toString())
+  }
+
+  /** Flushes the output stream. This function is required when partial
+   *  output (i.e. output not terminated by a newline character) has
+   *  to be made visible on the terminal.
+   */
+  def flush() { out.flush() }
+
+  /** Prints a newline character on the default output.
+   */
+  def println() { out.println() }
+
+  /** Prints out an object to the default output, followed by a newline 
character.
+   *
+   *  @param x the object to print.
+   */
+  def println(x: Any) { out.println(x) }
+
+  /** Prints its arguments as a formatted string to the default output,
+   *  based on a string pattern (in a fashion similar to printf in C).
+   *
+   *  The interpretation of the formatting patterns is described in
+   *  <a href="" target="contentFrame" class="java/util/Formatter">
+   *  `java.util.Formatter`</a>.
+   *
+   *  @param text the pattern for formatting the arguments.
+   *  @param args the arguments used to instantiating the pattern.
+   *  @throws java.lang.IllegalArgumentException if there was a problem with 
the format string or arguments
+   */
+  def printf(text: String, args: Any*) { out.print(text format (args : _*)) }
+
+  /** Read a full line from the default input.  Returns `null` if the end of 
the
+   * input stream has been reached.
+   *
+   * @return the string read from the terminal or null if the end of stream 
was reached.
+   */
+  def readLine(): String = in.readLine()
+
+  /** Print formatted text to the default output and read a full line from the 
default input.
+   *  Returns `null` if the end of the input stream has been reached.
+   *
+   *  @param text the format of the text to print out, as in `printf`.
+   *  @param args the parameters used to instantiate the format, as in 
`printf`.
+   *  @return the string read from the default input
+   */
+  def readLine(text: String, args: Any*): String = {
+    printf(text, args: _*)
+    readLine()
+  }
+
+  /** Reads a boolean value from an entire line of the default input.
+   *  Has a fairly liberal interpretation of the input.
+   *
+   *  @return the boolean value read, or false if it couldn't be converted to 
a boolean
+   *  @throws java.io.EOFException if the end of the input stream has been 
reached.
+   */
+  def readBoolean(): Boolean = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s.toLowerCase() match {
+        case "true" => true
+        case "t" => true
+        case "yes" => true
+        case "y" => true
+        case _ => false
+      }
+  }
+
+  /** Reads a byte value from an entire line of the default input.
+   *
+   *  @return the Byte that was read
+   *  @throws java.io.EOFException if the end of the
+   *  input stream has been reached.
+   *  @throws java.lang.NumberFormatException if the value couldn't be 
converted to a Byte
+   */
+  def readByte(): Byte = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s.toByte
+  }
+
+  /** Reads a short value from an entire line of the default input.
+   *
+   *  @return the short that was read
+   *  @throws java.io.EOFException if the end of the
+   *  input stream has been reached.
+   *  @throws java.lang.NumberFormatException if the value couldn't be 
converted to a Short
+   */
+  def readShort(): Short = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s.toShort
+  }
+
+  /** Reads a char value from an entire line of the default input.
+   *
+   *  @return the Char that was read
+   *  @throws java.io.EOFException if the end of the
+   *  input stream has been reached.
+   *  @throws java.lang.StringIndexOutOfBoundsException if the line read from 
default input was empty
+   */
+  def readChar(): Char = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s charAt 0
+  }
+
+  /** Reads an int value from an entire line of the default input.
+   *
+   *  @return the Int that was read
+   *  @throws java.io.EOFException if the end of the
+   *  input stream has been reached.
+   *  @throws java.lang.NumberFormatException if the value couldn't be 
converted to an Int
+   */
+  def readInt(): Int = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s.toInt
+  }
+
+  /** Reads an long value from an entire line of the default input.
+   *
+   *  @return the Long that was read
+   *  @throws java.io.EOFException if the end of the
+   *  input stream has been reached.
+   *  @throws java.lang.NumberFormatException if the value couldn't be 
converted to a Long
+   */
+  def readLong(): Long = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s.toLong
+  }
+
+  /** Reads a float value from an entire line of the default input.
+   *  @return the Float that was read.
+   *  @throws java.io.EOFException if the end of the
+   *  input stream has been reached.
+   *  @throws java.lang.NumberFormatException if the value couldn't be 
converted to a Float
+   *
+   */
+  def readFloat(): Float = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s.toFloat
+  }
+
+  /** Reads a double value from an entire line of the default input.
+   *
+   *  @return the Double that was read.
+   *  @throws java.io.EOFException if the end of the
+   *  input stream has been reached.
+   *  @throws java.lang.NumberFormatException if the value couldn't be 
converted to a Float
+   */
+  def readDouble(): Double = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      s.toDouble
+  }
+
+  /** Reads in some structured input (from the default input), specified by
+   *  a format specifier. See class `java.text.MessageFormat` for details of
+   *  the format specification.
+   *
+   *  @param format the format of the input.
+   *  @return a list of all extracted values.
+   *  @throws java.io.EOFException if the end of the input stream has been
+   *          reached.
+   */
+  def readf(format: String): List[Any] = {
+    val s = readLine()
+    if (s == null)
+      throw new java.io.EOFException("Console has reached end of input")
+    else
+      textComponents(new MessageFormat(format).parse(s))
+  }
+
+  /** Reads in some structured input (from the default input), specified by
+   *  a format specifier, returning only the first value extracted, according
+   *  to the format specification.
+   *
+   *  @param format format string, as accepted by `readf`.
+   *  @return The first value that was extracted from the input
+   */
+  def readf1(format: String): Any = readf(format).head
+
+  /** Reads in some structured input (from the default input), specified
+   *  by a format specifier, returning only the first two values extracted,
+   *  according to the format specification.
+   *
+   *  @param format format string, as accepted by `readf`.
+   *  @return A [[scala.Tuple2]] containing the first two values extracted
+   */
+  def readf2(format: String): (Any, Any) = {
+    val res = readf(format)
+    (res.head, res.tail.head)
+  }
+
+  /** Reads in some structured input (from the default input), specified
+   *  by a format specifier, returning only the first three values extracted,
+   *  according to the format specification.
+   *
+   *  @param format format string, as accepted by `readf`.
+   *  @return A [[scala.Tuple3]] containing the first three values extracted
+   */
+  def readf3(format: String): (Any, Any, Any) = {
+    val res = readf(format)
+    (res.head, res.tail.head, res.tail.tail.head)
+  }
+
+  private def textComponents(a: Array[AnyRef]): List[Any] = {
+    var i: Int = a.length - 1
+    var res: List[Any] = Nil
+    while (i >= 0) {
+      res = (a(i) match {
+        case x: java.lang.Boolean   => x.booleanValue()
+        case x: java.lang.Byte      => x.byteValue()
+        case x: java.lang.Short     => x.shortValue()
+        case x: java.lang.Character => x.charValue()
+        case x: java.lang.Integer   => x.intValue()
+        case x: java.lang.Long      => x.longValue()
+        case x: java.lang.Float     => x.floatValue()
+        case x: java.lang.Double    => x.doubleValue()
+        case x => x
+      }) :: res;
+      i -= 1
+    }
+    res
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/test/scala/org/apache/spark/api/r/RBackendHelperTest.scala
----------------------------------------------------------------------
diff --git a/r/src/test/scala/org/apache/spark/api/r/RBackendHelperTest.scala 
b/r/src/test/scala/org/apache/spark/api/r/RBackendHelperTest.scala
new file mode 100644
index 0000000..cdc314d
--- /dev/null
+++ b/r/src/test/scala/org/apache/spark/api/r/RBackendHelperTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers._
+
+class RBackendHelperTest extends FlatSpec {
+
+  val backend : RBackendHelper = RBackendHelper()
+  val backend2 : RBackendHelper = RBackendHelper()
+
+  "RBackendHelper" should "create a SparkR backend" in {
+    val rbackend = backend
+    assert(true) // only looking for exceptions here
+  }
+
+  it should "initialize properly, returning a port > 0" in {
+    val port = backend.init()
+    assert(port > 0)
+  }
+
+  it should "start a thread" in {
+    val backend = backend2
+    backend.init()
+    val thread = backend.start()
+    thread shouldBe a [Thread]
+  }
+  
+  it should "close without error" in {
+    backend2.close
+    assert(true) // only looking for exceptions
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextInitTest.scala
----------------------------------------------------------------------
diff --git 
a/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextInitTest.scala 
b/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextInitTest.scala
new file mode 100644
index 0000000..3d74e58
--- /dev/null
+++ b/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextInitTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rinterpreter
+
+import java.io.{File, PrintWriter}
+import java.nio.file.{Files, Paths}
+
+import org.apache.zeppelin.rinterpreter.rscala.RClient
+import org.apache.zeppelin.rinterpreter.rscala.RClient._
+import org.scalatest.Matchers._
+import org.scalatest._
+
+class RContextInitTest extends FlatSpec {
+  import scala.sys.process._
+  var cmd: PrintWriter = null
+  val command = RClient.defaultRCmd +: RClient.defaultArguments
+  var processCmd : ProcessBuilder = null
+
+    "Process command" should "create a process builder" in {
+      processCmd = Process(command)
+      processCmd shouldBe a[ProcessBuilder]
+    }
+  it should "be persistent for testing purposes" in {
+    processCmd shouldBe a [ProcessBuilder]
+  }
+
+  var processIO : ProcessIO = null
+
+  "Creating Process IO" should "not throw an exception" in {
+    processIO = new ProcessIO(
+      o => {
+        cmd = new PrintWriter(o)
+      },
+      reader("STDOUT DEBUG: "),
+      reader("STDERR DEBUG: "),
+      true
+    )
+    processIO shouldBe a [ProcessIO]
+  }
+  var portsFile : File = null
+    "A temp file " should "be created" in {
+      portsFile = File.createTempFile("rscala-", "")
+      assertResult(true) {portsFile.exists()}
+    }
+  var processInstance : Process = null
+
+  "Process instance" should "launch" in {
+    processInstance = processCmd.run(processIO)
+    assert(true)
+  }
+  var libpath : String = null
+  "RZeppelin R Package" should "be found" in {
+    libpath  = if (Files.exists(Paths.get("R/lib"))) "R/lib"
+    else if (Files.exists(Paths.get("../R/lib"))) "../R/lib"
+    else throw new RuntimeException("Could not find rzeppelin - it must be in 
either R/lib or ../R/lib")
+    assert(Files.exists(Paths.get(libpath + "/rzeppelin")))
+  }
+  var snippet : String = null
+
+  "Creating the snippit" should "be impossible to fail" in {
+    snippet =     s"""
+library(lib.loc="$libpath", rzeppelin)
+rzeppelin:::rServe(rzeppelin:::newSockets('${portsFile.getAbsolutePath.replaceAll(File.separator,
 "/")}',debug=FALSE,timeout=60))
+q(save='no')"""
+    assert(true)
+  }
+  "Cmd" should "stop being null" in {
+    while (cmd == null) Thread.sleep(100)
+    assert(cmd != null)
+  }
+  it should "accept the snippet" in {
+    cmd.println(snippet)
+    cmd.flush()
+    assert(true)
+  }
+
+  var sockets : ScalaSockets = null
+
+  "Scala Sockets" should "be created and signal OK" in {
+    sockets = new ScalaSockets(portsFile.getAbsolutePath)
+    sockets.out.writeInt(RClient.Protocol.OK)
+    sockets.out.flush()
+    assert(true)
+  }
+  "The R and Scala versions" should "match" in {
+    assert(RClient.readString(sockets.in) == 
org.apache.zeppelin.rinterpreter.rscala.Version)
+  }
+  var rcon : RContext = null
+  "Creating an RContext" should "not fail" in {
+    rcon = new RContext(sockets, false)
+  }
+  "An open RContext" should "destroy safely" in {
+    rcon.close()
+    assertResult(false) {
+      rcon.isOpen
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextTest.scala
----------------------------------------------------------------------
diff --git 
a/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextTest.scala 
b/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextTest.scala
new file mode 100644
index 0000000..8b11156
--- /dev/null
+++ b/r/src/test/scala/org/apache/zeppelin/rinterpreter/RContextTest.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.rinterpreter
+
+import java.util.Properties
+
+import org.apache.zeppelin.RTest
+import org.apache.zeppelin.rinterpreter.rscala.RException
+import org.apache.zeppelin.spark.SparkInterpreter
+import org.scalatest.Matchers._
+import org.scalatest._
+
+class RContextTest extends FlatSpec {
+  RContext.resetRcon()
+  
+  val rcon = RContext(new Properties(), "test")
+  
+  "The RContext Singleton" should "create an RContext without Spark" in { () =>
+    rcon shouldBe a[RContext]
+  }
+  
+  "The RContext" should "be openable without spark" in { () =>
+    rcon.open(None)
+    assert(rcon.isOpen)
+  }
+
+  it should "be able to confirm that stats is available" taggedAs(RTest) in { 
() =>
+    assertResult(true) {
+      rcon.testRPackage("stats")
+    }
+  }
+
+  it should "be able to confirm that a bogus package is not available"  
taggedAs(RTest) in { () =>
+    assertResult(false) {
+      rcon.testRPackage("thisisagarbagepackagename")
+    }
+  }
+
+  it should "be able to add 2 + 2"  taggedAs(RTest) in { () =>
+    assertResult(4) {
+      rcon.evalI0("2 + 2")
+    }
+  }
+  it should "be able to return a vector"  taggedAs(RTest) in { () =>
+    assertResult(10) {
+      rcon.evalI1("1:10").length
+    }
+  }
+  it should "be able to return a string"  taggedAs(RTest) in { () =>
+    
+    assertResult("hello world") {
+      rcon.evalS0("'hello world'")
+    }
+  }
+  it should "be able to return a vector of strings"  taggedAs(RTest)  in { () 
=>
+    
+    assertResult(26) {
+      rcon.evalS1("LETTERS").length
+    }
+  }
+
+  it should "throw an RException if told to evaluate garbage code"  
taggedAs(RTest)  in { () =>
+    
+    intercept[RException] {
+      rcon.eval("funkyfunction()")
+    }
+  }
+
+//  it should "Throw an exception if we try to initialize SparkR without a 
SQLContext" in {() =>
+//
+//    intercept[RuntimeException] {
+//      rcon.initializeSparkRTest()
+//    }
+//  }
+
+  it should "have rzeppelin available"  taggedAs(RTest) in { () =>
+    
+    assertResult(true) {
+      rcon.testRPackage("rzeppelin")
+    }
+  }
+  it should "have evaluate available"  taggedAs(RTest) in { () =>
+    
+    assertResult(true) {
+      rcon.testRPackage("evaluate")
+    }
+  }
+  it should "have repr available"  taggedAs(RTest) in { () =>
+    
+    assertResult(true) {
+      rcon.testRPackage("repr")
+    }
+  }
+  it should "also close politely"  taggedAs(RTest) in { () =>
+    
+    rcon.close()
+    assertResult(2) {rcon.isOpen}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/test/scala/org/apache/zeppelin/rinterpreter/RInterpreterTest.scala
----------------------------------------------------------------------
diff --git 
a/r/src/test/scala/org/apache/zeppelin/rinterpreter/RInterpreterTest.scala 
b/r/src/test/scala/org/apache/zeppelin/rinterpreter/RInterpreterTest.scala
new file mode 100644
index 0000000..4b59b00
--- /dev/null
+++ b/r/src/test/scala/org/apache/zeppelin/rinterpreter/RInterpreterTest.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rinterpreter
+
+import java.util.Properties
+
+import org.apache.zeppelin.RTest
+import org.apache.zeppelin.interpreter.{Interpreter, InterpreterContext, 
InterpreterResult, InterpreterGroup}
+import org.scalatest.Matchers._
+import org.scalatest._
+import java.util.ArrayList
+
+class RInterpreterTest extends FlatSpec {
+
+  RContext.resetRcon()
+
+  class RIntTester extends RInterpreter(new Properties(), startSpark = false) {
+
+    def interpret(s: String, interpreterContext: InterpreterContext): 
InterpreterResult = {
+      val result : Array[String] = rContext.evalS1(s)
+      new InterpreterResult(InterpreterResult.Code.SUCCESS, 
result.mkString("\n"))
+    }
+  }
+  val rint = new RIntTester()
+
+  "An RInterpreter" should "exist" in {
+    assert(rint != null)
+  }
+
+  it should "not complain when we assign it a group" in {
+    val grp : InterpreterGroup = new InterpreterGroup("test")
+    val lst : ArrayList[Interpreter] = new ArrayList[Interpreter]()
+    lst.add(rint)
+    grp.put(rint.getClassName(), lst)
+    rint.setInterpreterGroup(grp)
+  }
+
+  it should "create a fresh rContext when we ask for one" in {
+    assert(! rint.getrContext.isOpen)
+  }
+
+  it should "open"  taggedAs(RTest) in {
+    rint.open()
+    assert(rint.getrContext.isOpen)
+  }
+
+  it should "have rzeppelin available"  taggedAs(RTest) in {
+    assume(rint.getrContext.isOpen)
+    assert(rint.getrContext.testRPackage("rzeppelin"))
+  }
+  it should "have an rContext able to do simple addition" taggedAs(RTest)  in {
+    assume(rint.getrContext.isOpen)
+    assert(rint.getrContext.evalI0("2 + 2") == 4)
+  }
+
+
+
+  it should "have a functional completion function" taggedAs(RTest) in {
+    val result = rint.hiddenCompletion("hi", 3)
+    result should (contain ("hist"))
+  }
+
+  it should "have a working progress meter" in {
+    rint.getrContext.setProgress(50)
+    assertResult(50) {
+      rint.getrContext.getProgress
+    }
+  }
+
+  it should "have persistent properties" in {
+    val props = new Properties()
+    props.setProperty("hello", "world")
+    rint.setProperty(props)
+    assertResult("world") {
+      rint.getProperty("hello")
+    }
+  }
+
+  var rint2 : RIntTester = null
+
+  it should "Share RContexts if they share the same InterpreterGroup" in {
+    rint2 = new RIntTester()
+    val lst : ArrayList[Interpreter] = new ArrayList[Interpreter]()
+    lst.add(rint2)
+    val grp = rint.getInterpreterGroup()
+    grp.put(rint2.getClassName(), lst)
+    rint2.setInterpreterGroup(grp)
+    rint2.open()
+    rint.getrContext should be theSameInstanceAs rint2.getrContext
+  }
+
+  "Opening the second RInterpreter" should "not have closed the first 
RContext" in {
+    assert(rint.getrContext.isOpen)
+  }
+
+  var rint3 : RIntTester = null
+
+  "An RInterpreter in a different InterpreterGroup" should "have a different R 
Context" in {
+    rint3 = new RIntTester()
+    val grp : InterpreterGroup = new InterpreterGroup("othertest")
+    val lst : ArrayList[Interpreter] = new ArrayList[Interpreter]()
+    lst.add(rint3)
+    grp.put(rint3.getClassName(), lst)
+    rint3.setInterpreterGroup(grp)
+    rint3.open()
+    rint3.getrContext shouldNot be theSameInstanceAs rint2.getrContext
+  }
+
+  "The first RInterpreter" should "close politely" in {
+    rint.close()
+    assert(!rint.getrContext.isOpen)
+  }
+
+  "and so" should "the other one" in {
+    rint2.close()
+    assert(!rint2.getrContext.isOpen)
+  }
+
+  "and " should "the third one" in {
+    rint3.close()
+    assert(!rint2.getrContext.isOpen)
+  }
+
+//  fixture.sparky.close()
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala
----------------------------------------------------------------------
diff --git 
a/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala 
b/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala
new file mode 100644
index 0000000..43fb0d2
--- /dev/null
+++ b/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.rinterpreter
+
+import java.util
+import java.util.Properties
+
+import org.apache.zeppelin.interpreter.{Interpreter, InterpreterGroup, 
InterpreterResult}
+import org.scalatest.FlatSpec
+import java.util.List
+import org.scalatest.Matchers._
+
+class WrapperTest extends FlatSpec {
+  RContext.resetRcon()
+
+  val repl: RRepl = new RRepl(new Properties(), false)
+  val group : InterpreterGroup = new InterpreterGroup()
+  var lst = new util.LinkedList[Interpreter]()
+  lst.add(repl)
+  group.put(repl.getClassName(), lst)
+  repl.setInterpreterGroup(group)
+
+  "The R REPL" should "exist and be of the right class" in {
+
+    repl shouldBe a[RRepl]
+  }
+
+  it should "Have a RRepl Interpreter inside" in {
+    repl.getInnerInterpreter shouldBe a[RReplInterpreter]
+  }
+  val repi = repl.getInnerInterpreter.asInstanceOf[RReplInterpreter]
+
+  it should "have a fresh rContext" in {
+    assert(!repi.getrContext.isOpen)
+  }
+
+  val knitr: KnitR = new KnitR(new Properties(), false)
+  lst = new util.LinkedList[Interpreter]()
+  lst.add(knitr)
+  group.put(knitr.getClassName(), lst)
+  knitr.setInterpreterGroup(group)
+
+  "The KnitR wrapper" should "exist and be of the right class" in {
+    knitr shouldBe a[KnitR]
+    }
+    it should "have a KnitRInterpreter inside" in {
+      knitr.getInnerInterpreter shouldBe a [KnitRInterpreter]
+    }
+
+  it should "share the RContext" in {
+    knitr.getInnerInterpreter.asInstanceOf[KnitRInterpreter].getrContext 
should be theSameInstanceAs repi.getrContext
+  }
+
+  it should "open without error" in {
+    knitr.open()
+    
assert(knitr.getInnerInterpreter.asInstanceOf[KnitRInterpreter].getrContext.isOpen)
+  }
+
+  it should "produce HTML in response to a simple query" in {
+    val result = knitr.interpret(
+      """
+        |```{r}
+        |2 + 2
+        |```
+      """.stripMargin, null)
+    withClue(result.message()) {
+      result should have (
+      'code (InterpreterResult.Code.SUCCESS),
+      'type (InterpreterResult.Type.HTML)
+      )
+    }
+  }
+
+  it should "close properly" in {
+    repi.getrContext.close()
+    assertResult(false) {
+      repi.getrContext.isOpen
+    }
+  }
+
+  "Just in case there are two rContexts, the other one" should "close properly 
also" in {
+    val rcon = 
knitr.getInnerInterpreter.asInstanceOf[KnitRInterpreter].getrContext
+    rcon.close()
+    assertResult(false) {
+      rcon.isOpen
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/r/src/test/scala/org/apache/zeppelin/rinterpreter/package.scala
----------------------------------------------------------------------
diff --git a/r/src/test/scala/org/apache/zeppelin/rinterpreter/package.scala 
b/r/src/test/scala/org/apache/zeppelin/rinterpreter/package.scala
new file mode 100644
index 0000000..eceeec5
--- /dev/null
+++ b/r/src/test/scala/org/apache/zeppelin/rinterpreter/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin
+
+import org.scalatest.Tag
+
+object RTest extends Tag("RTest")
+object SparkTest extends Tag("SparkTest")

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index a1c28af..ece7467 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -231,7 +231,6 @@
       <scope>provided</scope>
     </dependency>
 
-
     <!--TEST-->
     <dependency>
       <groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index eb1c0a2..2fa716b 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -72,7 +72,6 @@ public class SparkVersion {
     return olderThan(MIN_SUPPORTED_VERSION) || 
newerThanEquals(UNSUPPORTED_FUTURE_VERSION);
   }
 
-
   public static SparkVersion fromVersionString(String versionString) {
     return new SparkVersion(versionString);
   }
@@ -81,6 +80,10 @@ public class SparkVersion {
     return this.newerThanEquals(SPARK_1_2_0);
   }
 
+  public boolean isSparkRSupported() {
+    return this.newerThanEquals(SPARK_1_4_0);
+  }
+
   public boolean hasDataFrame() {
     return this.newerThanEquals(SPARK_1_4_0);
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 5a489fa..cdd1806 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -41,187 +41,193 @@ import com.google.gson.Gson;
  * Spark cluster is started by CI server using testing/startSparkCluster.sh
  */
 public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
-  Gson gson = new Gson();
-
-  @BeforeClass
-  public static void init() throws Exception {
-    AbstractTestRestApi.startUp();
-  }
-
-  @AfterClass
-  public static void destroy() throws Exception {
-    AbstractTestRestApi.shutDown();
-  }
-
-  private void waitForFinish(Paragraph p) {
-    while (p.getStatus() != Status.FINISHED
-        && p.getStatus() != Status.ERROR
-        && p.getStatus() != Status.ABORT) {
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        LOG.error("Exception in WebDriverManager while getWebDriver ", e);
-      }
+    Gson gson = new Gson();
+
+    @BeforeClass
+    public static void init() throws Exception {
+        AbstractTestRestApi.startUp();
     }
-  }
-
-  @Test
-  public void basicRDDTransformationAndActionTest() throws IOException {
-    // create new note
-    Note note = ZeppelinServer.notebook.createNote();
-
-    // run markdown paragraph, again
-    Paragraph p = note.addParagraph();
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
-    p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))");
-    note.run(p.getId());
-    waitForFinish(p);
-    assertEquals(Status.FINISHED, p.getStatus());
-    assertEquals("55", p.getResult().message());
-    ZeppelinServer.notebook.removeNote(note.id());
-  }
-
-  @Test
-  public void pySparkTest() throws IOException {
-    // create new note
-    Note note = ZeppelinServer.notebook.createNote();
-    int sparkVersion = getSparkVersionNumber(note);
-
-    if (isPyspark() && sparkVersion >= 12) {   // pyspark supported from 1.2.1
-      // run markdown paragraph, again
-      Paragraph p = note.addParagraph();
-      Map config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
-      p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, 
b: a + b))");
-      note.run(p.getId());
-      waitForFinish(p);
-      assertEquals(Status.FINISHED, p.getStatus());
-      assertEquals("55\n", p.getResult().message());
+
+    @AfterClass
+    public static void destroy() throws Exception {
+        AbstractTestRestApi.shutDown();
     }
-    ZeppelinServer.notebook.removeNote(note.id());
-  }
-
-  @Test
-  public void pySparkAutoConvertOptionTest() throws IOException {
-    // create new note
-    Note note = ZeppelinServer.notebook.createNote();
-
-    int sparkVersion = getSparkVersionNumber(note);
-
-    if (isPyspark() && sparkVersion >= 14) {   // auto_convert enabled from 
spark 1.4
-      // run markdown paragraph, again
-      Paragraph p = note.addParagraph();
-      Map config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
-      p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
-          + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) 
* 3.14).count())");
-      note.run(p.getId());
-      waitForFinish(p);
-      assertEquals(Status.FINISHED, p.getStatus());
-      assertEquals("10\n", p.getResult().message());
+
+    private void waitForFinish(Paragraph p) {
+        while (p.getStatus() != Status.FINISHED
+                && p.getStatus() != Status.ERROR
+                && p.getStatus() != Status.ABORT) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                LOG.error("Exception in WebDriverManager while getWebDriver ", 
e);
+            }
+        }
     }
-    ZeppelinServer.notebook.removeNote(note.id());
-  }
-
-  @Test
-  public void zRunTest() throws IOException {
-    // create new note
-    Note note = ZeppelinServer.notebook.createNote();
-    Paragraph p0 = note.addParagraph();
-    Map config0 = p0.getConfig();
-    config0.put("enabled", true);
-    p0.setConfig(config0);
-    p0.setText("%spark z.run(1)");
-    Paragraph p1 = note.addParagraph();
-    Map config1 = p1.getConfig();
-    config1.put("enabled", true);
-    p1.setConfig(config1);
-    p1.setText("%spark val a=10");
-    Paragraph p2 = note.addParagraph();
-    Map config2 = p2.getConfig();
-    config2.put("enabled", true);
-    p2.setConfig(config2);
-    p2.setText("%spark print(a)");
-
-    note.run(p0.getId());
-    waitForFinish(p0);
-    assertEquals(Status.FINISHED, p0.getStatus());
-
-    note.run(p2.getId());
-    waitForFinish(p2);
-    assertEquals(Status.FINISHED, p2.getStatus());
-    assertEquals("10", p2.getResult().message());
-
-    ZeppelinServer.notebook.removeNote(note.id());
-  }
-
-  @Test
-  public void pySparkDepLoaderTest() throws IOException {
-    // create new note
-    Note note = ZeppelinServer.notebook.createNote();
-
-    if (isPyspark() && getSparkVersionNumber(note) >= 14) {
-      // restart spark interpreter
-      List<InterpreterSetting> settings =
-          ZeppelinServer.notebook.getBindedInterpreterSettings(note.id());
-
-      for (InterpreterSetting setting : settings) {
-        if (setting.getGroup().equals("spark")) {
-          
ZeppelinServer.notebook.getInterpreterFactory().restart(setting.id());
-          break;
+
+    @Test
+    public void basicRDDTransformationAndActionTest() throws IOException {
+        // create new note
+        Note note = ZeppelinServer.notebook.createNote();
+
+        // run markdown paragraph, again
+        Paragraph p = note.addParagraph();
+        Map config = p.getConfig();
+        config.put("enabled", true);
+        p.setConfig(config);
+        p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))");
+        note.run(p.getId());
+        waitForFinish(p);
+        assertEquals(Status.FINISHED, p.getStatus());
+        assertEquals("55", p.getResult().message());
+        ZeppelinServer.notebook.removeNote(note.id());
+    }
+
+    @Test
+    public void pySparkTest() throws IOException {
+        // create new note
+        Note note = ZeppelinServer.notebook.createNote();
+        note.setName("note");
+        int sparkVersion = getSparkVersionNumber(note);
+
+        if (isPyspark() && sparkVersion >= 12) {   // pyspark supported from 
1.2.1
+            // run markdown paragraph, again
+            Paragraph p = note.addParagraph();
+            Map config = p.getConfig();
+            config.put("enabled", true);
+            p.setConfig(config);
+            p.setText("%pyspark print(sc.parallelize(range(1, 
11)).reduce(lambda a, b: a + b))");
+//            p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
+            note.run(p.getId());
+            waitForFinish(p);
+            assertEquals(Status.FINISHED, p.getStatus());
+            assertEquals("55\n", p.getResult().message());
         }
-      }
-
-      // load dep
-      Paragraph p0 = note.addParagraph();
-      Map config = p0.getConfig();
-      config.put("enabled", true);
-      p0.setConfig(config);
-      p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")");
-      note.run(p0.getId());
-      waitForFinish(p0);
-      assertEquals(Status.FINISHED, p0.getStatus());
-
-      // write test csv file
-      File tmpFile = File.createTempFile("test", "csv");
-      FileUtils.write(tmpFile, "a,b\n1,2");
-
-      // load data using libraries from dep loader
-      Paragraph p1 = note.addParagraph();
-      p1.setConfig(config);
-      p1.setText("%pyspark\n" +
-        "from pyspark.sql import SQLContext\n" +
-        "print(sqlContext.read.format('com.databricks.spark.csv')" +
-        ".load('"+ tmpFile.getAbsolutePath() +"').count())");
-      note.run(p1.getId());
-
-      waitForFinish(p1);
-      assertEquals(Status.FINISHED, p1.getStatus());
-      assertEquals("2\n", p1.getResult().message());
+        ZeppelinServer.notebook.removeNote(note.id());
+    }
+
+    @Test
+    public void pySparkAutoConvertOptionTest() throws IOException {
+        // create new note
+        Note note = ZeppelinServer.notebook.createNote();
+        note.setName("note");
+
+        int sparkVersion = getSparkVersionNumber(note);
+
+        if (isPyspark() && sparkVersion >= 14) {   // auto_convert enabled 
from spark 1.4
+            // run markdown paragraph, again
+            Paragraph p = note.addParagraph();
+            Map config = p.getConfig();
+            config.put("enabled", true);
+            p.setConfig(config);
+            p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+                    + "print(sqlContext.range(0, 10).withColumn('uniform', 
rand(seed=10) * 3.14).count())");
+//            p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
+            note.run(p.getId());
+            waitForFinish(p);
+            assertEquals(Status.FINISHED, p.getStatus());
+            assertEquals("10\n", p.getResult().message());
+        }
+        ZeppelinServer.notebook.removeNote(note.id());
+    }
+
+    @Test
+    public void zRunTest() throws IOException {
+        // create new note
+        Note note = ZeppelinServer.notebook.createNote();
+        Paragraph p0 = note.addParagraph();
+        Map config0 = p0.getConfig();
+        config0.put("enabled", true);
+        p0.setConfig(config0);
+        p0.setText("%spark z.run(1)");
+        Paragraph p1 = note.addParagraph();
+        Map config1 = p1.getConfig();
+        config1.put("enabled", true);
+        p1.setConfig(config1);
+        p1.setText("%spark val a=10");
+        Paragraph p2 = note.addParagraph();
+        Map config2 = p2.getConfig();
+        config2.put("enabled", true);
+        p2.setConfig(config2);
+        p2.setText("%spark print(a)");
+
+        note.run(p0.getId());
+        waitForFinish(p0);
+        assertEquals(Status.FINISHED, p0.getStatus());
+
+        note.run(p2.getId());
+        waitForFinish(p2);
+        assertEquals(Status.FINISHED, p2.getStatus());
+        assertEquals("10", p2.getResult().message());
+
+        ZeppelinServer.notebook.removeNote(note.id());
+    }
+
+    @Test
+    public void pySparkDepLoaderTest() throws IOException {
+        // create new note
+        Note note = ZeppelinServer.notebook.createNote();
+
+        if (isPyspark() && getSparkVersionNumber(note) >= 14) {
+            // restart spark interpreter
+            List<InterpreterSetting> settings =
+                    
ZeppelinServer.notebook.getBindedInterpreterSettings(note.id());
+
+            for (InterpreterSetting setting : settings) {
+                if (setting.getGroup().equals("spark")) {
+                    
ZeppelinServer.notebook.getInterpreterFactory().restart(setting.id());
+                    break;
+                }
+            }
+
+            // load dep
+            Paragraph p0 = note.addParagraph();
+            Map config = p0.getConfig();
+            config.put("enabled", true);
+            p0.setConfig(config);
+            p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")");
+            note.run(p0.getId());
+            waitForFinish(p0);
+            assertEquals(Status.FINISHED, p0.getStatus());
+
+            // write test csv file
+            File tmpFile = File.createTempFile("test", "csv");
+            FileUtils.write(tmpFile, "a,b\n1,2");
+
+            // load data using libraries from dep loader
+            Paragraph p1 = note.addParagraph();
+            p1.setConfig(config);
+            p1.setText("%pyspark\n" +
+                    "from pyspark.sql import SQLContext\n" +
+                    "print(sqlContext.read.format('com.databricks.spark.csv')" 
+
+                    ".load('"+ tmpFile.getAbsolutePath() +"').count())");
+            note.run(p1.getId());
+
+            waitForFinish(p1);
+            assertEquals(Status.FINISHED, p1.getStatus());
+            assertEquals("2\n", p1.getResult().message());
+        }
+    }
+
+    /**
+     * Get spark version number as a numerical value.
+     * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+     */
+    private int getSparkVersionNumber(Note note) {
+        Paragraph p = note.addParagraph();
+        note.setName("note");
+        Map config = p.getConfig();
+        config.put("enabled", true);
+        p.setConfig(config);
+        p.setText("%spark print(sc.version)");
+//        p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
+        note.run(p.getId());
+        waitForFinish(p);
+        assertEquals(Status.FINISHED, p.getStatus());
+        String sparkVersion = p.getResult().message();
+        System.out.println("Spark version detected " + sparkVersion);
+        String[] split = sparkVersion.split("\\.");
+        int version = Integer.parseInt(split[0]) * 10 + 
Integer.parseInt(split[1]);
+        return version;
     }
-  }
-
-  /**
-   * Get spark version number as a numerical value.
-   * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
-   */
-  private int getSparkVersionNumber(Note note) {
-    Paragraph p = note.addParagraph();
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
-    p.setText("%spark print(sc.version)");
-    note.run(p.getId());
-    waitForFinish(p);
-    assertEquals(Status.FINISHED, p.getStatus());
-    String sparkVersion = p.getResult().message();
-    System.out.println("Spark version detected " + sparkVersion);
-    String[] split = sparkVersion.split("\\.");
-    int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
-    return version;
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/zeppelin-web/bower.json
----------------------------------------------------------------------
diff --git a/zeppelin-web/bower.json b/zeppelin-web/bower.json
index 330c408..c5fc7ab 100644
--- a/zeppelin-web/bower.json
+++ b/zeppelin-web/bower.json
@@ -47,6 +47,7 @@
         "src-noconflict/mode-sql.js",
         "src-noconflict/mode-markdown.js",
         "src-noconflict/mode-sh.js",
+        "src-noconflict/mode-r.js",
         "src-noconflict/keybinding-emacs.js",
         "src-noconflict/ext-language_tools.js",
         "src-noconflict/theme-chrome.js"

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/zeppelin-web/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-web/pom.xml b/zeppelin-web/pom.xml
index 8878e9a..6fa62ae 100644
--- a/zeppelin-web/pom.xml
+++ b/zeppelin-web/pom.xml
@@ -124,6 +124,7 @@
             </goals>
             <configuration>
               <arguments>build</arguments>
+              <arguments>--force</arguments>
             </configuration>
           </execution>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/zeppelin-web/src/index.html
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/index.html b/zeppelin-web/src/index.html
index bd0dbae..5c3a8ad 100644
--- a/zeppelin-web/src/index.html
+++ b/zeppelin-web/src/index.html
@@ -108,6 +108,7 @@ limitations under the License.
     <script 
src="bower_components/ace-builds/src-noconflict/mode-sql.js"></script>
     <script 
src="bower_components/ace-builds/src-noconflict/mode-markdown.js"></script>
     <script 
src="bower_components/ace-builds/src-noconflict/mode-sh.js"></script>
+    <script 
src="bower_components/ace-builds/src-noconflict/mode-r.js"></script>
     <script 
src="bower_components/ace-builds/src-noconflict/keybinding-emacs.js"></script>
     <script 
src="bower_components/ace-builds/src-noconflict/ext-language_tools.js"></script>
     <script 
src="bower_components/ace-builds/src-noconflict/theme-chrome.js"></script>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/zeppelin-web/test/karma.conf.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/test/karma.conf.js b/zeppelin-web/test/karma.conf.js
index f6f9969..320492f 100644
--- a/zeppelin-web/test/karma.conf.js
+++ b/zeppelin-web/test/karma.conf.js
@@ -37,6 +37,7 @@ module.exports = function(config) {
       'bower_components/ace-builds/src-noconflict/mode-sql.js',
       'bower_components/ace-builds/src-noconflict/mode-markdown.js',
       'bower_components/ace-builds/src-noconflict/mode-sh.js',
+      'bower_components/ace-builds/src-noconflict/mode-r.js',
       'bower_components/ace-builds/src-noconflict/keybinding-emacs.js',
       'bower_components/ace-builds/src-noconflict/ext-language_tools.js',
       'bower_components/ace-builds/src-noconflict/theme-chrome.js',

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d5e87fb8/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index b33391a..fc11a41 100755
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -472,7 +472,9 @@ public class ZeppelinConfiguration extends XMLConfiguration 
{
         + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
         + "org.apache.zeppelin.scalding.ScaldingInterpreter,"
         + "org.apache.zeppelin.jdbc.JDBCInterpreter,"
-        + "org.apache.zeppelin.hbase.HbaseInterpreter"),
+        + "org.apache.zeppelin.hbase.HbaseInterpreter,"
+        + "org.apache.zeppelin.rinterpreter.RRepl,"
+        + "org.apache.zeppelin.rinterpreter.KnitR"),
     ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
     ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", 
"local-repo"),
     
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 
30000),

Reply via email to