Repository: zeppelin
Updated Branches:
  refs/heads/master 0cff6f0f0 -> 483dc3f2b


ZEPPELIN-3296. Reorg livy integration test to minimize livy session

### What is this PR for?
Just refactor livy integration test to minuze livy session so that we can 
reduce the livy build time.

### What type of PR is it?
[Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3296

### How should this be tested?
* Travis CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2844 from zjffdu/ZEPPELIN-3296 and squashes the following commits:

206ea3e [Jeff Zhang] ZEPPELIN-3296. Reorg livy integration test to minimize 
livy session


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/483dc3f2
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/483dc3f2
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/483dc3f2

Branch: refs/heads/master
Commit: 483dc3f2bb46d18b7bbb41d72118c356bd9de403
Parents: 0cff6f0
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Mar 6 17:17:35 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Wed Mar 7 09:24:40 2018 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   9 +-
 .../apache/zeppelin/livy/LivyInterpreterIT.java | 646 +++++++------------
 2 files changed, 250 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/483dc3f2/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a6f72c8..9edb198 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -114,17 +114,16 @@ matrix:
       dist: trusty
       env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" 
BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
-    # Test python/pyspark with python 2, livy 0.5
     - sudo: required
       dist: trusty
       jdk: "openjdk7"
-      env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 
-Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify 
-DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' 
-DfailIfNoTests=false"
+      env: PYTHON="2" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
LIVY_VER="0.5.0-incubating" PROFILE="" BUILD_FLAG="install -am -DskipTests 
-DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS=""
 
-    # Test python/pyspark with python 3, livy 0.5
+    # Test livy 0.5 with spark 2.2.0 under python3
     - sudo: required
       dist: trusty
-      jdk: "openjdk7"
-      env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" 
LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 
-Pscala-2.11" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify 
-DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' 
-DfailIfNoTests=false"
+      jdk: "openjdk8"
+      env: PYTHON="3" SPARK_VER="2.2.0" HADOOP_VER="2.6" 
LIVY_VER="0.5.0-incubating" PROFILE="" BUILD_FLAG="install -am -DskipTests 
-DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS=""
 
 before_install:
   # check files included in commit range, clear bower_components if a 
bower.json file has changed.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/483dc3f2/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java 
b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index 3dfeb36..96fdbea 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -18,17 +18,27 @@
 package org.apache.zeppelin.livy;
 
 
+import org.apache.commons.io.IOUtils;
 import org.apache.livy.test.framework.Cluster;
 import org.apache.livy.test.framework.Cluster$;
-import org.apache.commons.io.IOUtils;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Properties;
 
@@ -78,128 +88,7 @@ public class LivyInterpreterIT {
 
 
   @Test
-  public void testSparkInterpreterRDD() throws InterpreterException {
-    if (!checkPreCondition()) {
-      return;
-    }
-    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
-    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    final LivySparkInterpreter sparkInterpreter = new 
LivySparkInterpreter(properties);
-    sparkInterpreter.setInterpreterGroup(interpreterGroup);
-    interpreterGroup.get("session_1").add(sparkInterpreter);
-    AuthenticationInfo authInfo = new AuthenticationInfo("user1");
-    MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
-    InterpreterOutput output = new InterpreterOutput(outputListener);
-    final InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.spark",
-        "title", "text", authInfo, null, null, null, null, null, null, output);
-    sparkInterpreter.open();
-
-    try {
-      // detect spark version
-      InterpreterResult result = sparkInterpreter.interpret("sc.version", 
context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-
-      boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
-      // test RDD api
-      result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", 
context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-      assertTrue(result.message().get(0).getData().contains("Double = 55.0"));
-
-      // single line comment
-      String singleLineComment = "println(1)// my comment";
-      result = sparkInterpreter.interpret(singleLineComment, context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-
-      // multiple line comment
-      String multipleLineComment = "println(1)/* multiple \n" + "line \n" + 
"comment */";
-      result = sparkInterpreter.interpret(multipleLineComment, context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-
-      // multi-line string
-      String multiLineString = "val str = \"\"\"multiple\n" +
-          "line\"\"\"\n" +
-          "println(str)";
-      result = sparkInterpreter.interpret(multiLineString, context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-      assertTrue(result.message().get(0).getData().contains("multiple\nline"));
-
-      // case class
-      String caseClassCode = "case class Person(id:Int, \n" +
-          "name:String)\n" +
-          "val p=Person(1, \"name_a\")";
-      result = sparkInterpreter.interpret(caseClassCode, context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-      assertTrue(result.message().get(0).getData().contains("p: Person = 
Person(1,name_a)"));
-
-      // object class
-      String objectClassCode = "object Person {}";
-      result = sparkInterpreter.interpret(objectClassCode, context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-      if (!isSpark2) {
-        assertTrue(result.message().get(0).getData().contains("defined module 
Person"));
-      } else {
-        assertTrue(result.message().get(0).getData().contains("defined object 
Person"));
-      }
-
-      // html output
-      String htmlCode = "println(\"%html <h1> hello </h1>\")";
-      result = sparkInterpreter.interpret(htmlCode, context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-      assertEquals(InterpreterResult.Type.HTML, 
result.message().get(0).getType());
-
-      // error
-      result = sparkInterpreter.interpret("println(a)", context);
-      assertEquals(InterpreterResult.Code.ERROR, result.code());
-      assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
-      assertTrue(result.message().get(0).getData().contains("error: not found: 
value a"));
-
-      // incomplete code
-      result = sparkInterpreter.interpret("if(true){", context);
-      assertEquals(InterpreterResult.Code.ERROR, result.code());
-      assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
-      assertTrue(result.message().get(0).getData().contains("incomplete 
statement"));
-
-      // cancel
-      if 
(sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
-        Thread cancelThread = new Thread() {
-          @Override
-          public void run() {
-            // invoke cancel after 1 millisecond to wait job starting
-            try {
-              Thread.sleep(1);
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-            sparkInterpreter.cancel(context);
-          }
-        };
-        cancelThread.start();
-        result = sparkInterpreter
-            .interpret("sc.parallelize(1 to 
10).foreach(e=>Thread.sleep(10*1000))", context);
-        assertEquals(InterpreterResult.Code.ERROR, result.code());
-        String message = result.message().get(0).getData();
-        // 2 possibilities, sometimes livy doesn't return the real cancel 
exception
-        assertTrue(message.contains("cancelled part of cancelled job group") ||
-            message.contains("Job is cancelled"));
-      }
-
-    } finally {
-      sparkInterpreter.close();
-    }
-  }
-
-
-  @Test
-  public void testSparkInterpreterDataFrame() throws InterpreterException {
+  public void testSparkInterpreter() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
     }
@@ -227,309 +116,229 @@ public class LivyInterpreterIT {
       assertEquals(1, result.message().size());
 
       boolean isSpark2 = isSpark2(sparkInterpreter, context);
+      testRDD(sparkInterpreter, isSpark2);
+      testDataFrame(sparkInterpreter, sqlInterpreter, isSpark2);
 
-      // test DataFrame api
-      if (!isSpark2) {
-        result = sparkInterpreter.interpret(
-            "val 
df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
-      } else {
-        result = sparkInterpreter.interpret(
-            "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", 
\"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
-      }
-      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
-      // test LivySparkSQLInterpreter which share the same SparkContext with 
LivySparkInterpreter
-      result = sqlInterpreter.interpret("select * from df where 
col_1='hello'", context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
-      assertEquals("col_1\tcol_2\nhello\t20", 
result.message().get(0).getData());
-      // double quotes
-      result = sqlInterpreter.interpret("select * from df where 
col_1=\"hello\"", context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
-      assertEquals("col_1\tcol_2\nhello\t20", 
result.message().get(0).getData());
-
-      // only enable this test in spark2 as spark1 doesn't work for this case
-      if (isSpark2) {
-        result = sqlInterpreter.interpret("select * from df where 
col_1=\"he\\\"llo\" ", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
-      }
-
-      // single quotes inside attribute value
-      result = sqlInterpreter.interpret("select * from df where 
col_1=\"he'llo\"", context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
-
-      // test sql with syntax error
-      result = sqlInterpreter.interpret("select * from df2", context);
-      assertEquals(InterpreterResult.Code.ERROR, result.code());
-      assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
-
-      if (!isSpark2) {
-        assertTrue(result.message().get(0).getData().contains("Table not 
found"));
-      } else {
-        assertTrue(result.message().get(0).getData().contains("Table or view 
not found"));
-      }
     } finally {
       sparkInterpreter.close();
       sqlInterpreter.close();
     }
   }
 
-  @Test
-  public void testSparkSQLInterpreter() throws InterpreterException {
-    if (!checkPreCondition()) {
-      return;
-    }
-    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
-    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
-        new LivySparkInterpreter(properties));
-    sparkInterpreter.setInterpreterGroup(interpreterGroup);
-    interpreterGroup.get("session_1").add(sparkInterpreter);
-    LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
-        new LivySparkSQLInterpreter(properties));
-    interpreterGroup.get("session_1").add(sqlInterpreter);
-    sqlInterpreter.setInterpreterGroup(interpreterGroup);
-    sqlInterpreter.open();
-
-    try {
-      AuthenticationInfo authInfo = new AuthenticationInfo("user1");
-      MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
-      InterpreterOutput output = new InterpreterOutput(outputListener);
-      InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.sql",
-          "title", "text", authInfo, null, null, null, null, null, null, 
output);
-      InterpreterResult result = sqlInterpreter.interpret("show tables", 
context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
-      assertTrue(result.message().get(0).getData().contains("tableName"));
-      int r = sqlInterpreter.getProgress(context);
-      assertTrue(r == 0);
-    } finally {
-      sqlInterpreter.close();
-    }
-  }
-
-
-  @Test
-  public void testSparkSQLCancellation() throws InterpreterException {
-    if (!checkPreCondition()) {
-      return;
-    }
-    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
-    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    LivySparkInterpreter sparkInterpreter = new 
LivySparkInterpreter(properties);
-    sparkInterpreter.setInterpreterGroup(interpreterGroup);
-    interpreterGroup.get("session_1").add(sparkInterpreter);
+  private void testRDD(final LivySparkInterpreter sparkInterpreter, boolean 
isSpark2) {
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
     final InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.spark",
         "title", "text", authInfo, null, null, null, null, null, null, output);
-    sparkInterpreter.open();
-
-    final LivySparkSQLInterpreter sqlInterpreter = new 
LivySparkSQLInterpreter(properties);
-    interpreterGroup.get("session_1").add(sqlInterpreter);
-    sqlInterpreter.setInterpreterGroup(interpreterGroup);
-    sqlInterpreter.open();
-
-    try {
-      // detect spark version
-      InterpreterResult result = sparkInterpreter.interpret("sc.version", 
context);
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(1, result.message().size());
-
-      boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
-      // test DataFrame api
-      if (!isSpark2) {
-        result = sparkInterpreter.interpret(
-            "val 
df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
-      } else {
-        result = sparkInterpreter.interpret(
-            "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", 
\"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
-      }
-      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
 
-      // cancel
-      if 
(sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
-        Thread cancelThread = new Thread() {
-          @Override
-          public void run() {
-            sqlInterpreter.cancel(context);
+    InterpreterResult result = sparkInterpreter.interpret("sc.parallelize(1 to 
10).sum()", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+    assertTrue(result.message().get(0).getData().contains("Double = 55.0"));
+
+    // single line comment
+    String singleLineComment = "println(1)// my comment";
+    result = sparkInterpreter.interpret(singleLineComment, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+
+    // multiple line comment
+    String multipleLineComment = "println(1)/* multiple \n" + "line \n" + 
"comment */";
+    result = sparkInterpreter.interpret(multipleLineComment, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+
+    // multi-line string
+    String multiLineString = "val str = \"\"\"multiple\n" +
+        "line\"\"\"\n" +
+        "println(str)";
+    result = sparkInterpreter.interpret(multiLineString, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+    assertTrue(result.message().get(0).getData().contains("multiple\nline"));
+
+    // case class
+    String caseClassCode = "case class Person(id:Int, \n" +
+        "name:String)\n" +
+        "val p=Person(1, \"name_a\")";
+    result = sparkInterpreter.interpret(caseClassCode, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+    assertTrue(result.message().get(0).getData().contains("p: Person = 
Person(1,name_a)"));
+
+    // object class
+    String objectClassCode = "object Person {}";
+    result = sparkInterpreter.interpret(objectClassCode, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+    if (!isSpark2) {
+      assertTrue(result.message().get(0).getData().contains("defined module 
Person"));
+    } else {
+      assertTrue(result.message().get(0).getData().contains("defined object 
Person"));
+    }
+
+    // html output
+    String htmlCode = "println(\"%html <h1> hello </h1>\")";
+    result = sparkInterpreter.interpret(htmlCode, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, result.message().size());
+    assertEquals(InterpreterResult.Type.HTML, 
result.message().get(0).getType());
+
+    // error
+    result = sparkInterpreter.interpret("println(a)", context);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertTrue(result.message().get(0).getData().contains("error: not found: 
value a"));
+
+    // incomplete code
+    result = sparkInterpreter.interpret("if(true){", context);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+    assertTrue(result.message().get(0).getData().contains("incomplete 
statement"));
+
+    // cancel
+    if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+      Thread cancelThread = new Thread() {
+        @Override
+        public void run() {
+          // invoke cancel after 1 millisecond to wait job starting
+          try {
+            Thread.sleep(1);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
           }
-        };
-        cancelThread.start();
-        //sleep so that cancelThread performs a cancel.
-        try {
-          Thread.sleep(1);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-        result = sqlInterpreter
-            .interpret("select count(1) from df", context);
-        if (result.code().equals(InterpreterResult.Code.ERROR)) {
-          String message = result.message().get(0).getData();
-          // 2 possibilities, sometimes livy doesn't return the real cancel 
exception
-          assertTrue(message.contains("cancelled part of cancelled job group") 
||
-              message.contains("Job is cancelled"));
+          sparkInterpreter.cancel(context);
         }
-      }
-    } catch (LivyException e) {
-    } finally {
-      sparkInterpreter.close();
-      sqlInterpreter.close();
+      };
+      cancelThread.start();
+      result = sparkInterpreter
+          .interpret("sc.parallelize(1 to 
10).foreach(e=>Thread.sleep(10*1000))", context);
+      assertEquals(InterpreterResult.Code.ERROR, result.code());
+      String message = result.message().get(0).getData();
+      // 2 possibilities, sometimes livy doesn't return the real cancel 
exception
+      assertTrue(message.contains("cancelled part of cancelled job group") ||
+          message.contains("Job is cancelled"));
     }
   }
 
-  @Test
-  public void testStringWithTruncation() throws InterpreterException {
-    if (!checkPreCondition()) {
-      return;
-    }
-    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
-    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    LivySparkInterpreter sparkInterpreter = new 
LivySparkInterpreter(properties);
-    sparkInterpreter.setInterpreterGroup(interpreterGroup);
-    interpreterGroup.get("session_1").add(sparkInterpreter);
+  private void testDataFrame(LivySparkInterpreter sparkInterpreter,
+                             final LivySparkSQLInterpreter sqlInterpreter,
+                             boolean isSpark2) throws LivyException {
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.spark",
+    final InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.spark",
         "title", "text", authInfo, null, null, null, null, null, null, output);
-    sparkInterpreter.open();
-
-    LivySparkSQLInterpreter sqlInterpreter = new 
LivySparkSQLInterpreter(properties);
-    interpreterGroup.get("session_1").add(sqlInterpreter);
-    sqlInterpreter.setInterpreterGroup(interpreterGroup);
-    sqlInterpreter.open();
 
-    try {
-      // detect spark version
-      InterpreterResult result = sparkInterpreter.interpret("sc.version", 
context);
+    InterpreterResult result = null;
+    // test DataFrame api
+    if (!isSpark2) {
+      result = sparkInterpreter.interpret(
+          "val 
df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+              + "df.collect()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertEquals(1, result.message().size());
-
-      boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
-      // test DataFrame api
-      if (!isSpark2) {
-        result = sparkInterpreter.interpret(
-            "val 
df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
-      } else {
-        result = sparkInterpreter.interpret(
-            "val 
df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
-      }
-      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
-      // test LivySparkSQLInterpreter which share the same SparkContext with 
LivySparkInterpreter
-      result = sqlInterpreter.interpret("select * from df where 
col_1='12characters12characters'", context);
+      assertTrue(result.message().get(0).getData()
+          .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+    } else {
+      result = sparkInterpreter.interpret(
+          "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", 
\"col_2\")\n"
+              + "df.collect()", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData()
+          .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+    }
+    sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+    // test LivySparkSQLInterpreter which share the same SparkContext with 
LivySparkInterpreter
+    result = sqlInterpreter.interpret("select * from df where col_1='hello'", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
+    assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
+    // double quotes
+    result = sqlInterpreter.interpret("select * from df where 
col_1=\"hello\"", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
+    assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
+
+    // only enable this test in spark2 as spark1 doesn't work for this case
+    if (isSpark2) {
+      result = sqlInterpreter.interpret("select * from df where 
col_1=\"he\\\"llo\" ", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
-      assertEquals("col_1\tcol_2\n12characters12cha...\t20", 
result.message().get(0).getData());
-    } finally {
-      sparkInterpreter.close();
-      sqlInterpreter.close();
     }
-  }
 
+    // single quotes inside attribute value
+    result = sqlInterpreter.interpret("select * from df where 
col_1=\"he'llo\"", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
 
-  @Test
-  public void testStringWithoutTruncation() throws InterpreterException {
-    if (!checkPreCondition()) {
-      return;
-    }
-    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
-    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
-    Properties newProps = new Properties();
-    for (Object name: properties.keySet()) {
-      newProps.put(name, properties.get(name));
+    // test sql with syntax error
+    result = sqlInterpreter.interpret("select * from df2", context);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
+
+    if (!isSpark2) {
+      assertTrue(result.message().get(0).getData().contains("Table not 
found"));
+    } else {
+      assertTrue(result.message().get(0).getData().contains("Table or view not 
found"));
     }
-    
newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, 
"false");
-    LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
-    sparkInterpreter.setInterpreterGroup(interpreterGroup);
-    interpreterGroup.get("session_1").add(sparkInterpreter);
-    AuthenticationInfo authInfo = new AuthenticationInfo("user1");
-    MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
-    InterpreterOutput output = new InterpreterOutput(outputListener);
-    InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.spark",
-        "title", "text", authInfo, null, null, null, null, null, null, output);
-    sparkInterpreter.open();
 
-    LivySparkSQLInterpreter sqlInterpreter = new 
LivySparkSQLInterpreter(newProps);
-    interpreterGroup.get("session_1").add(sqlInterpreter);
-    sqlInterpreter.setInterpreterGroup(interpreterGroup);
-    sqlInterpreter.open();
+    // test sql cancel
+    if 
(sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+      Thread cancelThread = new Thread() {
+        @Override
+        public void run() {
+          sqlInterpreter.cancel(context);
+        }
+      };
+      cancelThread.start();
+      //sleep so that cancelThread performs a cancel.
+      try {
+        Thread.sleep(1);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      result = sqlInterpreter
+          .interpret("select count(1) from df", context);
+      if (result.code().equals(InterpreterResult.Code.ERROR)) {
+        String message = result.message().get(0).getData();
+        // 2 possibilities, sometimes livy doesn't return the real cancel 
exception
+        assertTrue(message.contains("cancelled part of cancelled job group") ||
+            message.contains("Job is cancelled"));
+      }
+    }
 
-    try {
-      // detect spark version
-      InterpreterResult result = sparkInterpreter.interpret("sc.version", 
context);
+    // test result string truncate
+    if (!isSpark2) {
+      result = sparkInterpreter.interpret(
+          "val 
df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
+              + "df.collect()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertEquals(1, result.message().size());
-
-      boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
-      // test DataFrame api
-      if (!isSpark2) {
-        result = sparkInterpreter.interpret(
-            "val 
df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
-      } else {
-        result = sparkInterpreter.interpret(
-            "val 
df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
-                + "df.collect()", context);
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-        assertEquals(1, result.message().size());
-        assertTrue(result.message().get(0).getData()
-            .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
-      }
-      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
-      // test LivySparkSQLInterpreter which share the same SparkContext with 
LivySparkInterpreter
-      result = sqlInterpreter.interpret("select * from df where 
col_1='12characters12characters'", context);
+      assertTrue(result.message().get(0).getData()
+          .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
+    } else {
+      result = sparkInterpreter.interpret(
+          "val 
df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
+              + "df.collect()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
-      assertEquals("col_1\tcol_2\n12characters12characters\t20", 
result.message().get(0).getData());
-    } finally {
-      sparkInterpreter.close();
-      sqlInterpreter.close();
+      assertEquals(1, result.message().size());
+      assertTrue(result.message().get(0).getData()
+          .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
     }
+    sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+    // test LivySparkSQLInterpreter which share the same SparkContext with 
LivySparkInterpreter
+    result = sqlInterpreter.interpret("select * from df where 
col_1='12characters12characters'", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
+    assertEquals("col_1\tcol_2\n12characters12cha...\t20", 
result.message().get(0).getData());
+
   }
 
   @Test
-  public void testPySparkInterpreter() throws LivyException, 
InterpreterException {
+  public void testPySparkInterpreter() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
     }
@@ -549,7 +358,7 @@ public class LivyInterpreterIT {
       // for livy version >=0.3 , input some erroneous spark code, check the 
shown result is more than one line
       InterpreterResult result = 
pysparkInterpreter.interpret("sc.parallelize(wrongSyntax(1, 2)).count()", 
context);
       assertEquals(InterpreterResult.Code.ERROR, result.code());
-      assertTrue(result.message().get(0).getData().split("\n").length>1);
+      assertTrue(result.message().get(0).getData().split("\n").length > 1);
       assertTrue(result.message().get(0).getData().contains("Traceback"));
     } catch (APINotFoundException e) {
       // only livy 0.2 can throw this exception since it doesn't have /version 
endpoint
@@ -557,17 +366,17 @@ public class LivyInterpreterIT {
       // traceback
       InterpreterResult result = pysparkInterpreter.interpret("print(a)", 
context);
       assertEquals(InterpreterResult.Code.ERROR, result.code());
-      assertTrue(result.message().get(0).getData().split("\n").length>1);
+      assertTrue(result.message().get(0).getData().split("\n").length > 1);
       assertTrue(result.message().get(0).getData().contains("Traceback"));
     }
 
     // test utf-8 Encoding
     try {
       String utf8Str = "你你你你你你好";
-      InterpreterResult result = 
pysparkInterpreter.interpret("print(\""+utf8Str+"\")", context);
+      InterpreterResult result = pysparkInterpreter.interpret("print(\"" + 
utf8Str + "\")", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains(utf8Str));
-    }catch (Exception e) {
+    } catch (Exception e) {
       e.printStackTrace();
     }
 
@@ -650,7 +459,7 @@ public class LivyInterpreterIT {
   }
 
   @Test
-  public void testSparkInterpreterWithDisplayAppInfo() throws 
InterpreterException {
+  public void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation() 
throws InterpreterException {
     if (!checkPreCondition()) {
       return;
     }
@@ -660,6 +469,7 @@ public class LivyInterpreterIT {
     properties2.put("zeppelin.livy.displayAppInfo", "true");
     // enable spark ui because it is disabled by livy integration test
     properties2.put("livy.spark.ui.enabled", "true");
+    
properties2.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, 
"false");
     LivySparkInterpreter sparkInterpreter = new 
LivySparkInterpreter(properties2);
     sparkInterpreter.setInterpreterGroup(interpreterGroup);
     interpreterGroup.get("session_1").add(sparkInterpreter);
@@ -670,6 +480,11 @@ public class LivyInterpreterIT {
         "title", "text", authInfo, null, null, null, null, null, null, output);
     sparkInterpreter.open();
 
+    LivySparkSQLInterpreter sqlInterpreter = new 
LivySparkSQLInterpreter(properties2);
+    interpreterGroup.get("session_1").add(sqlInterpreter);
+    sqlInterpreter.setInterpreterGroup(interpreterGroup);
+    sqlInterpreter.open();
+
     try {
       InterpreterResult result = sparkInterpreter.interpret("sc.version", 
context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -683,13 +498,44 @@ public class LivyInterpreterIT {
       assertEquals(2, result.message().size());
       assertEquals(InterpreterResult.Type.HTML, 
result.message().get(0).getType());
 
+      // detect spark version
+      result = sparkInterpreter.interpret("sc.version", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(2, result.message().size());
+
+      boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+      if (!isSpark2) {
+        result = sparkInterpreter.interpret(
+            "val 
df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(2, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
+      } else {
+        result = sparkInterpreter.interpret(
+            "val 
df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\",
 \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(2, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = 
Array([12characters12characters,20])"));
+      }
+      sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+      // test LivySparkSQLInterpreter which share the same SparkContext with 
LivySparkInterpreter
+      result = sqlInterpreter.interpret("select * from df where 
col_1='12characters12characters'", context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
+      assertEquals("col_1\tcol_2\n12characters12characters\t20", 
result.message().get(0).getData());
     } finally {
       sparkInterpreter.close();
+      sqlInterpreter.close();
     }
   }
 
   @Test
-  public void testSparkRInterpreter() throws LivyException, 
InterpreterException {
+  public void testSparkRInterpreter() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
     }
@@ -847,7 +693,7 @@ public class LivyInterpreterIT {
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertEquals(1, result.message().size());
 
-      boolean isSpark2 = 
isSpark2((BaseLivyInterpreter)sparkInterpreter.getInnerInterpreter(), context);
+      boolean isSpark2 = isSpark2((BaseLivyInterpreter) 
sparkInterpreter.getInnerInterpreter(), context);
 
       if (!isSpark2) {
         result = sparkInterpreter.interpret(
@@ -891,10 +737,10 @@ public class LivyInterpreterIT {
         assertEquals(1, result.message().size());
         assertTrue(result.message().get(0).getData()
             .contains("+-----+-----+\n" +
-                      "|col_1|col_2|\n" +
-                      "+-----+-----+\n" +
-                      "|hello|   20|\n" +
-                      "+-----+-----+"));
+                "|col_1|col_2|\n" +
+                "+-----+-----+\n" +
+                "|hello|   20|\n" +
+                "+-----+-----+"));
 
         // access table from sparkr
         result = sparkRInterpreter.interpret("head(sql(\"select * from 
df\"))", context);

Reply via email to