Repository: zeppelin
Updated Branches:
  refs/heads/master b3ba458bd -> ad6e69181


[ZEPPELIN-2909]. Support shared SparkContext across language in livy interpreter

### What is this PR for?
LIVY-194 implement the shared SparkContext across languages, this ticket is 
trying to integrate this feature.

### What type of PR is it?
[ Feature ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2909

### How should this be tested?
Tested is added

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2587 from zjffdu/ZEPPELIN-2909 and squashes the following commits:

d6e38e6 [Jeff Zhang] [ZEPPELIN-2909]. Support shared SparkContext across 
language in livy interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ad6e6918
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ad6e6918
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ad6e6918

Branch: refs/heads/master
Commit: ad6e691812957e240d37918bbeae4a3c537fcccf
Parents: b3ba458
Author: Jeff Zhang <zjf...@apache.org>
Authored: Wed Aug 23 20:34:15 2017 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Thu Feb 8 21:38:05 2018 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   8 +-
 docs/interpreter/livy.md                        |   4 +
 livy/pom.xml                                    | 148 ++++++++++++++++++-
 .../zeppelin/livy/BaseLivyInterpreter.java      |  88 +++++++++--
 .../zeppelin/livy/LivySharedInterpreter.java    | 108 ++++++++++++++
 .../zeppelin/livy/LivySparkInterpreter.java     |   1 +
 .../zeppelin/livy/LivySparkSQLInterpreter.java  |  36 ++++-
 .../org/apache/zeppelin/livy/LivyVersion.java   |   5 +
 .../src/main/resources/interpreter-setting.json |  16 ++
 .../apache/zeppelin/livy/LivyInterpreterIT.java | 133 +++++++++++++++--
 10 files changed, 508 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index ce935b2..24da368 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -107,17 +107,17 @@ matrix:
       dist: trusty
       env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" 
SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test 
-DskipRat" MODULES="-pl 
.,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python"
 TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* 
-DfailIfNoTests=false"
 
-    # Test python/pyspark with python 2, livy 0.2
+    # Test python/pyspark with python 2, livy 0.5
     - sudo: required
       dist: trusty
       jdk: "openjdk7"
-      env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.6" 
LIVY_VER="0.4.0-incubating" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 
-Plivy-0.2 -Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" 
TEST_FLAG="verify -DskipRat" MODULES="-pl 
.,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy"
 
TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.*
 -Dpyspark.test.exclude='' -DfailIfNoTests=false"
+      env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 
-Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify 
-DskipRat" MODULES="-pl 
.,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy"
 
TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.*
 -Dpyspark.test.exclude='' -DfailIfNoTests=false"
 
-    # Test python/pyspark with python 3, livy 0.3
+    # Test python/pyspark with python 3, livy 0.5
     - sudo: required
       dist: trusty
       jdk: "openjdk7"
-      env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" 
LIVY_VER="0.4.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 
-Pscala-2.11 -Plivy-0.3" BUILD_FLAG="install -am -DskipTests -DskipRat" 
TEST_FLAG="verify -DskipRat" MODULES="-pl 
.,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy"
 
TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.*
 -Dpyspark.test.exclude='' -DfailIfNoTests=false"
+      env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" 
LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 
-Pscala-2.11" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify 
-DskipRat" MODULES="-pl 
.,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy"
 
TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.*
 -Dpyspark.test.exclude='' -DfailIfNoTests=false"
       
 before_install:
   # check files included in commit range, clear bower_components if a 
bower.json file has changed.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/docs/interpreter/livy.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md
index d53672a..e4784d4 100644
--- a/docs/interpreter/livy.md
+++ b/docs/interpreter/livy.md
@@ -216,6 +216,10 @@ select * from products where ${product_id=1}
 
 And creating dynamic formst programmatically is not feasible in livy 
interpreter, because ZeppelinContext is not available in livy interpreter.
 
+## Shared SparkContext
+Starting from livy 0.5 which is supported by Zeppelin 0.8.0, SparkContext is 
shared between scala, python, r and sql.
+That means you can query the table via `%livy.sql` when this table is 
registered in `%livy.spark`, `%livy.pyspark`, `$livy.sparkr`.
+
 ## FAQ
 
 Livy debugging: If you see any of these in error console

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/pom.xml
----------------------------------------------------------------------
diff --git a/livy/pom.xml b/livy/pom.xml
index 1c9d8fb..eddeb83 100644
--- a/livy/pom.xml
+++ b/livy/pom.xml
@@ -42,8 +42,9 @@
         
<spring.security.kerberosclient>1.0.1.RELEASE</spring.security.kerberosclient>
 
         <!--test library versions-->
-        <livy.version>0.4.0-incubating</livy.version>
+        <livy.version>0.5.0-incubating</livy.version>
         <spark.version>2.1.0</spark.version>
+        <hadoop.version>2.6.0</hadoop.version>
         <!--plugin versions-->
         <plugin.failsafe.version>2.16</plugin.failsafe.version>
         <plugin.antrun.version>1.8</plugin.antrun.version>
@@ -105,6 +106,30 @@
                     <groupId>org.apache.spark</groupId>
                     <artifactId>spark-yarn_${scala.binary.version}</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-tests</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -188,6 +213,127 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <classifier>tests</classifier>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <classifier>tests</classifier>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-tests</artifactId>
+            <classifier>tests</classifier>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java
index f3b7579..724a4b3 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java
@@ -57,6 +57,8 @@ import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,6 +97,9 @@ public abstract class BaseLivyInterpreter extends Interpreter 
{
   private RestTemplate restTemplate;
   private Map<String, String> customHeaders = new HashMap<>();
 
+  // delegate to sharedInterpreter when it is available
+  protected LivySharedInterpreter sharedInterpreter;
+
   Set<Object> paragraphsToCancel = Collections.newSetFromMap(
       new ConcurrentHashMap<Object, Boolean>());
   private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
@@ -144,7 +149,13 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
   @Override
   public void open() throws InterpreterException {
     try {
-      initLivySession();
+      this.livyVersion = getLivyVersion();
+      if (this.livyVersion.isSharedSupported()) {
+        sharedInterpreter = getLivySharedInterpreter();
+      }
+      if (sharedInterpreter == null || !sharedInterpreter.isSupported()) {
+        initLivySession();
+      }
     } catch (LivyException e) {
       String msg = "Fail to create session, please check livy interpreter log 
and " +
           "livy server log";
@@ -152,8 +163,32 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
     }
   }
 
+  protected LivySharedInterpreter getLivySharedInterpreter() throws 
InterpreterException {
+    LazyOpenInterpreter lazy = null;
+    LivySharedInterpreter sharedInterpreter = null;
+    Interpreter p = getInterpreterInTheSameSessionByClassName(
+        LivySharedInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
+      }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
+    }
+    sharedInterpreter = (LivySharedInterpreter) p;
+
+    if (lazy != null) {
+      lazy.open();
+    }
+    return sharedInterpreter;
+  }
+
   @Override
   public void close() {
+    if (sharedInterpreter != null && sharedInterpreter.isSupported()) {
+      sharedInterpreter.close();
+      return;
+    }
     if (sessionInfo != null) {
       closeSession(sessionInfo.id);
       // reset sessionInfo to null so that we won't close it twice.
@@ -181,14 +216,6 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
     } else {
       LOGGER.info("Create livy session successfully with sessionId: {}", 
this.sessionInfo.id);
     }
-    // check livy version
-    try {
-      this.livyVersion = getLivyVersion();
-      LOGGER.info("Use livy " + livyVersion);
-    } catch (APINotFoundException e) {
-      this.livyVersion = new LivyVersion("0.2.0");
-      LOGGER.info("Use livy 0.2.0");
-    }
   }
 
   protected abstract String extractAppId() throws LivyException;
@@ -196,17 +223,30 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
   protected abstract String extractWebUIAddress() throws LivyException;
 
   public SessionInfo getSessionInfo() {
+    if (sharedInterpreter != null && sharedInterpreter.isSupported()) {
+      return sharedInterpreter.getSessionInfo();
+    }
     return sessionInfo;
   }
 
+  public String getCodeType() {
+    if (getSessionKind().equalsIgnoreCase("pyspark3")) {
+      return "pyspark";
+    }
+    return getSessionKind();
+  }
+
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
+    if (sharedInterpreter != null && sharedInterpreter.isSupported()) {
+      return sharedInterpreter.interpret(st, getCodeType(), context);
+    }
     if (StringUtils.isEmpty(st)) {
       return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
     }
 
     try {
-      return interpret(st, context.getParagraphId(), this.displayAppInfo, 
true);
+      return interpret(st, null, context.getParagraphId(), 
this.displayAppInfo, true);
     } catch (LivyException e) {
       LOGGER.error("Fail to interpret:" + st, e);
       return new InterpreterResult(InterpreterResult.Code.ERROR,
@@ -245,6 +285,10 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
 
   @Override
   public void cancel(InterpreterContext context) {
+    if (sharedInterpreter != null && sharedInterpreter.isSupported()) {
+      sharedInterpreter.cancel(context);
+      return;
+    }
     paragraphsToCancel.add(context.getParagraphId());
     LOGGER.info("Added paragraph " + context.getParagraphId() + " for 
cancellation.");
   }
@@ -256,6 +300,10 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
 
   @Override
   public int getProgress(InterpreterContext context) {
+    if (sharedInterpreter != null && sharedInterpreter.isSupported()) {
+      return sharedInterpreter.getProgress(context);
+    }
+
     if (livyVersion.isGetProgressSupported()) {
       String paraId = context.getParagraphId();
       Integer progress = paragraphId2StmtProgressMap.get(paraId);
@@ -312,11 +360,20 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
                                      String paragraphId,
                                      boolean displayAppInfo,
                                      boolean appendSessionExpired) throws 
LivyException {
+    return interpret(code, sharedInterpreter.isSupported() ? getSessionKind() 
: null,
+        paragraphId, displayAppInfo, appendSessionExpired);
+  }
+
+  public InterpreterResult interpret(String code,
+                                     String codeType,
+                                     String paragraphId,
+                                     boolean displayAppInfo,
+                                     boolean appendSessionExpired) throws 
LivyException {
     StatementInfo stmtInfo = null;
     boolean sessionExpired = false;
     try {
       try {
-        stmtInfo = executeStatement(new ExecuteRequest(code));
+        stmtInfo = executeStatement(new ExecuteRequest(code, codeType));
       } catch (SessionNotFoundException e) {
         LOGGER.warn("Livy session {} is expired, new session will be 
created.", sessionInfo.id);
         sessionExpired = true;
@@ -328,7 +385,7 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
             initLivySession();
           }
         }
-        stmtInfo = executeStatement(new ExecuteRequest(code));
+        stmtInfo = executeStatement(new ExecuteRequest(code, codeType));
       }
 
       // pull the statement status
@@ -731,11 +788,12 @@ public abstract class BaseLivyInterpreter extends 
Interpreter {
     }
   }
 
-  private static class ExecuteRequest {
+  static class ExecuteRequest {
     public final String code;
-
-    public ExecuteRequest(String code) {
+    public final String kind;
+    public ExecuteRequest(String code, String kind) {
       this.code = code;
+      this.kind = kind;
     }
 
     public String toJson() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java
new file mode 100644
index 0000000..77e288b
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Livy Interpreter for shared kind which share SparkContext across 
spark/pyspark/r
+ */
+public class LivySharedInterpreter extends BaseLivyInterpreter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LivySharedInterpreter.class);
+
+  private boolean isSupported = false;
+
+  public LivySharedInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+    try {
+      // check livy version
+      try {
+        this.livyVersion = getLivyVersion();
+        LOGGER.info("Use livy " + livyVersion);
+      } catch (APINotFoundException e) {
+        // assume it is livy 0.2.0 when livy doesn't support rest api of 
fetching version.
+        this.livyVersion = new LivyVersion("0.2.0");
+        LOGGER.info("Use livy 0.2.0");
+      }
+
+      if (livyVersion.isSharedSupported()) {
+        LOGGER.info("LivySharedInterpreter is supported.");
+        isSupported = true;
+        initLivySession();
+      } else {
+        LOGGER.info("LivySharedInterpreter is not supported.");
+        isSupported = false;
+      }
+    } catch (LivyException e) {
+      String msg = "Fail to create session, please check livy interpreter log 
and " +
+          "livy server log";
+      throw new InterpreterException(msg, e);
+    }
+  }
+
+  public boolean isSupported() {
+    return isSupported;
+  }
+
+  public InterpreterResult interpret(String st, String codeType, 
InterpreterContext context) {
+    if (StringUtils.isEmpty(st)) {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
+    }
+
+    try {
+      return interpret(st, codeType, context.getParagraphId(), 
this.displayAppInfo, true);
+    } catch (LivyException e) {
+      LOGGER.error("Fail to interpret:" + st, e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR,
+          InterpreterUtils.getMostRelevantMessage(e));
+    }
+  }
+
+  @Override
+  public String getSessionKind() {
+    return "shared";
+  }
+
+  @Override
+  protected String extractAppId() throws LivyException {
+    return null;
+  }
+
+  @Override
+  protected String extractWebUIAddress() throws LivyException {
+    return null;
+  }
+
+  public static void main(String[] args) {
+    ExecuteRequest request = new ExecuteRequest("1+1", null);
+    System.out.println(request.toJson());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
index 606ef64..066d0da 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
@@ -44,6 +44,7 @@ public class LivySparkInterpreter extends BaseLivyInterpreter 
{
   protected String extractWebUIAddress() throws LivyException {
     interpret(
         "val 
webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
+        null,
         null, false, false);
     return extractStatementResult(
         interpret(

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index 7b2d7d6..2faa350 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -19,11 +19,14 @@ package org.apache.zeppelin.livy;
 
 import org.apache.commons.lang.StringUtils;
 import static org.apache.commons.lang.StringEscapeUtils.escapeJavaScript;
+import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.apache.zeppelin.user.AuthenticationInfo;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
@@ -39,6 +42,7 @@ public class LivySparkSQLInterpreter extends 
BaseLivyInterpreter {
       "zeppelin.livy.spark.sql.maxResult";
 
   private LivySparkInterpreter sparkInterpreter;
+  private String codeType = null;
 
   private boolean isSpark2 = false;
   private int maxResult = 1000;
@@ -64,7 +68,21 @@ public class LivySparkSQLInterpreter extends 
BaseLivyInterpreter {
     // As we don't know whether livyserver use spark2 or spark1, so we will 
detect SparkSession
     // to judge whether it is using spark2.
     try {
-      InterpreterResult result = sparkInterpreter.interpret("spark", null, 
false, false);
+      InterpreterContext context = new InterpreterContext(
+          "noteId",
+          "paragraphId",
+          "replName",
+          "paragraphTitle",
+          "paragraphText",
+          new AuthenticationInfo(),
+          new HashMap<String, Object>(),
+          new GUI(),
+          new GUI(),
+          null,
+          null,
+          null,
+          new InterpreterOutput(null));
+      InterpreterResult result = sparkInterpreter.interpret("spark", context);
       if (result.code() == InterpreterResult.Code.SUCCESS &&
           
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession"))
 {
         LOGGER.info("SparkSession is detected so we are using spark 2.x for 
session {}",
@@ -72,7 +90,7 @@ public class LivySparkSQLInterpreter extends 
BaseLivyInterpreter {
         isSpark2 = true;
       } else {
         // spark 1.x
-        result = sparkInterpreter.interpret("sqlContext", null, false, false);
+        result = sparkInterpreter.interpret("sqlContext", context);
         if (result.code() == InterpreterResult.Code.SUCCESS) {
           LOGGER.info("sqlContext is detected.");
         } else if (result.code() == InterpreterResult.Code.ERROR) {
@@ -81,7 +99,7 @@ public class LivySparkSQLInterpreter extends 
BaseLivyInterpreter {
           LOGGER.info("sqlContext is not detected, try to create SQLContext by 
ourselves");
           result = sparkInterpreter.interpret(
               "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
-                  + "import sqlContext.implicits._", null, false, false);
+                  + "import sqlContext.implicits._", context);
           if (result.code() == InterpreterResult.Code.ERROR) {
             throw new LivyException("Fail to create SQLContext," +
                 result.message().get(0).getData());
@@ -128,9 +146,7 @@ public class LivySparkSQLInterpreter extends 
BaseLivyInterpreter {
         sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + 
maxResult + ", " +
             truncate + ")";
       }
-      InterpreterResult result = sparkInterpreter.interpret(sqlQuery, 
context.getParagraphId(),
-          this.displayAppInfo, true);
-
+      InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context);
       if (result.code() == InterpreterResult.Code.SUCCESS) {
         InterpreterResult result2 = new 
InterpreterResult(InterpreterResult.Code.SUCCESS);
         for (InterpreterResultMessage message : result.message()) {
@@ -248,12 +264,16 @@ public class LivySparkSQLInterpreter extends 
BaseLivyInterpreter {
 
   @Override
   public void cancel(InterpreterContext context) {
-    sparkInterpreter.cancel(context);
+    if (this.sparkInterpreter != null) {
+      sparkInterpreter.cancel(context);
+    }
   }
 
   @Override
   public void close() {
-    this.sparkInterpreter.close();
+    if (this.sparkInterpreter != null) {
+      this.sparkInterpreter.close();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
index 7cfecfb..81bb8d4 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
@@ -29,6 +29,7 @@ public class LivyVersion {
   protected static final LivyVersion LIVY_0_2_0 = 
LivyVersion.fromVersionString("0.2.0");
   protected static final LivyVersion LIVY_0_3_0 = 
LivyVersion.fromVersionString("0.3.0");
   protected static final LivyVersion LIVY_0_4_0 = 
LivyVersion.fromVersionString("0.4.0");
+  protected static final LivyVersion LIVY_0_5_0 = 
LivyVersion.fromVersionString("0.5.0");
 
   private int version;
   private String versionString;
@@ -79,6 +80,10 @@ public class LivyVersion {
     return this.newerThanEquals(LIVY_0_4_0);
   }
 
+  public boolean isSharedSupported() {
+    return this.newerThanEquals(LIVY_0_5_0);
+  }
+
   public boolean equals(Object versionToCompare) {
     return version == ((LivyVersion) versionToCompare).version;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json 
b/livy/src/main/resources/interpreter-setting.json
index 2d72487..cecacac 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -227,5 +227,21 @@
       "editOnDblClick": false,
       "completionKey": "TAB"
     }
+  },
+  {
+    "group": "livy",
+    "name": "shared",
+    "className": "org.apache.zeppelin.livy.LivySharedInterpreter",
+    "properties": {
+    },
+    "option": {
+      "remote": true,
+      "port": -1,
+      "perNote": "shared",
+      "perUser": "scoped",
+      "isExistingProcess": false,
+      "setPermission": false,
+      "users": []
+    }
   }
 ]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java 
b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index ef3eabe..3dfeb36 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -34,6 +34,7 @@ import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class LivyInterpreterIT {
 
@@ -76,7 +77,7 @@ public class LivyInterpreterIT {
   }
 
 
-//  @Test
+  @Test
   public void testSparkInterpreterRDD() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -197,7 +198,7 @@ public class LivyInterpreterIT {
   }
 
 
-//  @Test
+  @Test
   public void testSparkInterpreterDataFrame() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -285,7 +286,7 @@ public class LivyInterpreterIT {
     }
   }
 
-//  @Test
+  @Test
   public void testSparkSQLInterpreter() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -320,7 +321,7 @@ public class LivyInterpreterIT {
   }
 
 
-//  @Test
+  @Test
   public void testSparkSQLCancellation() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -401,7 +402,7 @@ public class LivyInterpreterIT {
     }
   }
 
-//  @Test
+  @Test
   public void testStringWithTruncation() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -462,7 +463,7 @@ public class LivyInterpreterIT {
   }
 
 
-//  @Test
+  @Test
   public void testStringWithoutTruncation() throws InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -534,6 +535,7 @@ public class LivyInterpreterIT {
     }
 
     final LivyPySparkInterpreter pysparkInterpreter = new 
LivyPySparkInterpreter(properties);
+    pysparkInterpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     AuthenticationInfo authInfo = new AuthenticationInfo("user1");
     MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
     InterpreterOutput output = new InterpreterOutput(outputListener);
@@ -647,7 +649,7 @@ public class LivyInterpreterIT {
     }
   }
 
-//  @Test
+  @Test
   public void testSparkInterpreterWithDisplayAppInfo() throws 
InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -686,13 +688,15 @@ public class LivyInterpreterIT {
     }
   }
 
-//  @Test
+  @Test
   public void testSparkRInterpreter() throws LivyException, 
InterpreterException {
     if (!checkPreCondition()) {
       return;
     }
 
     final LivySparkRInterpreter sparkRInterpreter = new 
LivySparkRInterpreter(properties);
+    sparkRInterpreter.setInterpreterGroup(mock(InterpreterGroup.class));
+
     try {
       sparkRInterpreter.getLivyVersion();
     } catch (APINotFoundException e) {
@@ -749,8 +753,7 @@ public class LivyInterpreterIT {
 
       // error
       result = sparkRInterpreter.interpret("cat(a)", context);
-      //TODO @zjffdu, it should be ERROR, it is due to bug of LIVY-313
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(InterpreterResult.Code.ERROR, result.code());
       assertEquals(InterpreterResult.Type.TEXT, 
result.message().get(0).getType());
       assertTrue(result.message().get(0).getData().contains("object 'a' not 
found"));
     } finally {
@@ -758,7 +761,7 @@ public class LivyInterpreterIT {
     }
   }
 
-//  @Test
+  @Test
   public void testLivyTutorialNote() throws IOException, InterpreterException {
     if (!checkPreCondition()) {
       return;
@@ -796,6 +799,114 @@ public class LivyInterpreterIT {
     }
   }
 
+  @Test
+  public void testSharedInterpreter() throws InterpreterException {
+    if (!checkPreCondition()) {
+      return;
+    }
+    InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
+    interpreterGroup.put("session_1", new ArrayList<Interpreter>());
+    LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
+        new LivySparkInterpreter(properties));
+    sparkInterpreter.setInterpreterGroup(interpreterGroup);
+    interpreterGroup.get("session_1").add(sparkInterpreter);
+
+    LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
+        new LivySparkSQLInterpreter(properties));
+    interpreterGroup.get("session_1").add(sqlInterpreter);
+    sqlInterpreter.setInterpreterGroup(interpreterGroup);
+
+    LazyOpenInterpreter pysparkInterpreter = new LazyOpenInterpreter(
+        new LivyPySparkInterpreter(properties));
+    interpreterGroup.get("session_1").add(pysparkInterpreter);
+    pysparkInterpreter.setInterpreterGroup(interpreterGroup);
+
+    LazyOpenInterpreter sparkRInterpreter = new LazyOpenInterpreter(
+        new LivySparkRInterpreter(properties));
+    interpreterGroup.get("session_1").add(sparkRInterpreter);
+    sparkRInterpreter.setInterpreterGroup(interpreterGroup);
+
+    LazyOpenInterpreter sharedInterpreter = new LazyOpenInterpreter(
+        new LivySharedInterpreter(properties));
+    interpreterGroup.get("session_1").add(sharedInterpreter);
+    sharedInterpreter.setInterpreterGroup(interpreterGroup);
+
+    sparkInterpreter.open();
+    sqlInterpreter.open();
+    pysparkInterpreter.open();
+    sparkRInterpreter.open();
+
+    try {
+      AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+      MyInterpreterOutputListener outputListener = new 
MyInterpreterOutputListener();
+      InterpreterOutput output = new InterpreterOutput(outputListener);
+      InterpreterContext context = new InterpreterContext("noteId", 
"paragraphId", "livy.sql",
+          "title", "text", authInfo, null, null, null, null, null, null, 
output);
+      // detect spark version
+      InterpreterResult result = sparkInterpreter.interpret("sc.version", 
context);
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertEquals(1, result.message().size());
+
+      boolean isSpark2 = 
isSpark2((BaseLivyInterpreter)sparkInterpreter.getInnerInterpreter(), context);
+
+      if (!isSpark2) {
+        result = sparkInterpreter.interpret(
+            "val 
df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+        sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+
+        // access table from pyspark
+        result = pysparkInterpreter.interpret("sqlContext.sql(\"select * from 
df\").show()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("+-----+-----+\n" +
+                "|col_1|col_2|\n" +
+                "+-----+-----+\n" +
+                "|hello|   20|\n" +
+                "+-----+-----+"));
+
+        // access table from sparkr
+        result = sparkRInterpreter.interpret("head(sql(sqlContext, \"select * 
from df\"))", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData().contains("col_1 col_2\n1 
hello    20"));
+      } else {
+        result = sparkInterpreter.interpret(
+            "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", 
\"col_2\")\n"
+                + "df.collect()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+        sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+
+        // access table from pyspark
+        result = pysparkInterpreter.interpret("spark.sql(\"select * from 
df\").show()", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData()
+            .contains("+-----+-----+\n" +
+                      "|col_1|col_2|\n" +
+                      "+-----+-----+\n" +
+                      "|hello|   20|\n" +
+                      "+-----+-----+"));
+
+        // access table from sparkr
+        result = sparkRInterpreter.interpret("head(sql(\"select * from 
df\"))", context);
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        assertEquals(1, result.message().size());
+        assertTrue(result.message().get(0).getData().contains("col_1 col_2\n1 
hello    20"));
+      }
+    } finally {
+      sparkInterpreter.close();
+      sqlInterpreter.close();
+    }
+  }
 
   private boolean isSpark2(BaseLivyInterpreter interpreter, InterpreterContext 
context) {
     InterpreterResult result = null;

Reply via email to