Repository: zeppelin Updated Branches: refs/heads/master b3ba458bd -> ad6e69181
[ZEPPELIN-2909]. Support shared SparkContext across language in livy interpreter ### What is this PR for? LIVY-194 implement the shared SparkContext across languages, this ticket is trying to integrate this feature. ### What type of PR is it? [ Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2909 ### How should this be tested? Tested is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2587 from zjffdu/ZEPPELIN-2909 and squashes the following commits: d6e38e6 [Jeff Zhang] [ZEPPELIN-2909]. Support shared SparkContext across language in livy interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ad6e6918 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ad6e6918 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ad6e6918 Branch: refs/heads/master Commit: ad6e691812957e240d37918bbeae4a3c537fcccf Parents: b3ba458 Author: Jeff Zhang <zjf...@apache.org> Authored: Wed Aug 23 20:34:15 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Thu Feb 8 21:38:05 2018 +0800 ---------------------------------------------------------------------- .travis.yml | 8 +- docs/interpreter/livy.md | 4 + livy/pom.xml | 148 ++++++++++++++++++- .../zeppelin/livy/BaseLivyInterpreter.java | 88 +++++++++-- .../zeppelin/livy/LivySharedInterpreter.java | 108 ++++++++++++++ .../zeppelin/livy/LivySparkInterpreter.java | 1 + .../zeppelin/livy/LivySparkSQLInterpreter.java | 36 ++++- .../org/apache/zeppelin/livy/LivyVersion.java | 5 + .../src/main/resources/interpreter-setting.json | 16 ++ .../apache/zeppelin/livy/LivyInterpreterIT.java | 133 +++++++++++++++-- 10 files changed, 508 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index ce935b2..24da368 100644 --- a/.travis.yml +++ b/.travis.yml @@ -107,17 +107,17 @@ matrix: dist: trusty env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pweb-ci -Pspark-1.6 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" - # Test python/pyspark with python 2, livy 0.2 + # Test python/pyspark with python 2, livy 0.5 - sudo: required dist: trusty jdk: "openjdk7" - env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.6" LIVY_VER="0.4.0-incubating" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Plivy-0.2 -Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false" + env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false" - # Test python/pyspark with python 3, livy 0.3 + # Test python/pyspark with python 3, livy 0.5 - sudo: required dist: trusty jdk: "openjdk7" - env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" LIVY_VER="0.4.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11 -Plivy-0.3" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false" + env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl .,zeppelin-interpreter,zeppelin-display,spark/interpreter,spark/scala-2.10,spark/scala-2.11,spark/spark-dependencies,python,livy" TEST_PROJECTS="-Dtest=LivySQLInterpreterTest,org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false" before_install: # check files included in commit range, clear bower_components if a bower.json file has changed. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/docs/interpreter/livy.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index d53672a..e4784d4 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -216,6 +216,10 @@ select * from products where ${product_id=1} And creating dynamic formst programmatically is not feasible in livy interpreter, because ZeppelinContext is not available in livy interpreter. +## Shared SparkContext +Starting from livy 0.5 which is supported by Zeppelin 0.8.0, SparkContext is shared between scala, python, r and sql. +That means you can query the table via `%livy.sql` when this table is registered in `%livy.spark`, `%livy.pyspark`, `$livy.sparkr`. + ## FAQ Livy debugging: If you see any of these in error console http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/pom.xml ---------------------------------------------------------------------- diff --git a/livy/pom.xml b/livy/pom.xml index 1c9d8fb..eddeb83 100644 --- a/livy/pom.xml +++ b/livy/pom.xml @@ -42,8 +42,9 @@ <spring.security.kerberosclient>1.0.1.RELEASE</spring.security.kerberosclient> <!--test library versions--> - <livy.version>0.4.0-incubating</livy.version> + <livy.version>0.5.0-incubating</livy.version> <spark.version>2.1.0</spark.version> + <hadoop.version>2.6.0</hadoop.version> <!--plugin versions--> <plugin.failsafe.version>2.16</plugin.failsafe.version> <plugin.antrun.version>1.8</plugin.antrun.version> @@ -105,6 +106,30 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_${scala.binary.version}</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -188,6 +213,127 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <classifier>tests</classifier> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <classifier>tests</classifier> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java index f3b7579..724a4b3 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java @@ -57,6 +57,8 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +97,9 @@ public abstract class BaseLivyInterpreter extends Interpreter { private RestTemplate restTemplate; private Map<String, String> customHeaders = new HashMap<>(); + // delegate to sharedInterpreter when it is available + protected LivySharedInterpreter sharedInterpreter; + Set<Object> paragraphsToCancel = Collections.newSetFromMap( new ConcurrentHashMap<Object, Boolean>()); private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap = @@ -144,7 +149,13 @@ public abstract class BaseLivyInterpreter extends Interpreter { @Override public void open() throws InterpreterException { try { - initLivySession(); + this.livyVersion = getLivyVersion(); + if (this.livyVersion.isSharedSupported()) { + sharedInterpreter = getLivySharedInterpreter(); + } + if (sharedInterpreter == null || !sharedInterpreter.isSupported()) { + initLivySession(); + } } catch (LivyException e) { String msg = "Fail to create session, please check livy interpreter log and " + "livy server log"; @@ -152,8 +163,32 @@ public abstract class BaseLivyInterpreter extends Interpreter { } } + protected LivySharedInterpreter getLivySharedInterpreter() throws InterpreterException { + LazyOpenInterpreter lazy = null; + LivySharedInterpreter sharedInterpreter = null; + Interpreter p = getInterpreterInTheSameSessionByClassName( + LivySharedInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + sharedInterpreter = (LivySharedInterpreter) p; + + if (lazy != null) { + lazy.open(); + } + return sharedInterpreter; + } + @Override public void close() { + if (sharedInterpreter != null && sharedInterpreter.isSupported()) { + sharedInterpreter.close(); + return; + } if (sessionInfo != null) { closeSession(sessionInfo.id); // reset sessionInfo to null so that we won't close it twice. @@ -181,14 +216,6 @@ public abstract class BaseLivyInterpreter extends Interpreter { } else { LOGGER.info("Create livy session successfully with sessionId: {}", this.sessionInfo.id); } - // check livy version - try { - this.livyVersion = getLivyVersion(); - LOGGER.info("Use livy " + livyVersion); - } catch (APINotFoundException e) { - this.livyVersion = new LivyVersion("0.2.0"); - LOGGER.info("Use livy 0.2.0"); - } } protected abstract String extractAppId() throws LivyException; @@ -196,17 +223,30 @@ public abstract class BaseLivyInterpreter extends Interpreter { protected abstract String extractWebUIAddress() throws LivyException; public SessionInfo getSessionInfo() { + if (sharedInterpreter != null && sharedInterpreter.isSupported()) { + return sharedInterpreter.getSessionInfo(); + } return sessionInfo; } + public String getCodeType() { + if (getSessionKind().equalsIgnoreCase("pyspark3")) { + return "pyspark"; + } + return getSessionKind(); + } + @Override public InterpreterResult interpret(String st, InterpreterContext context) { + if (sharedInterpreter != null && sharedInterpreter.isSupported()) { + return sharedInterpreter.interpret(st, getCodeType(), context); + } if (StringUtils.isEmpty(st)) { return new InterpreterResult(InterpreterResult.Code.SUCCESS, ""); } try { - return interpret(st, context.getParagraphId(), this.displayAppInfo, true); + return interpret(st, null, context.getParagraphId(), this.displayAppInfo, true); } catch (LivyException e) { LOGGER.error("Fail to interpret:" + st, e); return new InterpreterResult(InterpreterResult.Code.ERROR, @@ -245,6 +285,10 @@ public abstract class BaseLivyInterpreter extends Interpreter { @Override public void cancel(InterpreterContext context) { + if (sharedInterpreter != null && sharedInterpreter.isSupported()) { + sharedInterpreter.cancel(context); + return; + } paragraphsToCancel.add(context.getParagraphId()); LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation."); } @@ -256,6 +300,10 @@ public abstract class BaseLivyInterpreter extends Interpreter { @Override public int getProgress(InterpreterContext context) { + if (sharedInterpreter != null && sharedInterpreter.isSupported()) { + return sharedInterpreter.getProgress(context); + } + if (livyVersion.isGetProgressSupported()) { String paraId = context.getParagraphId(); Integer progress = paragraphId2StmtProgressMap.get(paraId); @@ -312,11 +360,20 @@ public abstract class BaseLivyInterpreter extends Interpreter { String paragraphId, boolean displayAppInfo, boolean appendSessionExpired) throws LivyException { + return interpret(code, sharedInterpreter.isSupported() ? getSessionKind() : null, + paragraphId, displayAppInfo, appendSessionExpired); + } + + public InterpreterResult interpret(String code, + String codeType, + String paragraphId, + boolean displayAppInfo, + boolean appendSessionExpired) throws LivyException { StatementInfo stmtInfo = null; boolean sessionExpired = false; try { try { - stmtInfo = executeStatement(new ExecuteRequest(code)); + stmtInfo = executeStatement(new ExecuteRequest(code, codeType)); } catch (SessionNotFoundException e) { LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id); sessionExpired = true; @@ -328,7 +385,7 @@ public abstract class BaseLivyInterpreter extends Interpreter { initLivySession(); } } - stmtInfo = executeStatement(new ExecuteRequest(code)); + stmtInfo = executeStatement(new ExecuteRequest(code, codeType)); } // pull the statement status @@ -731,11 +788,12 @@ public abstract class BaseLivyInterpreter extends Interpreter { } } - private static class ExecuteRequest { + static class ExecuteRequest { public final String code; - - public ExecuteRequest(String code) { + public final String kind; + public ExecuteRequest(String code, String kind) { this.code = code; + this.kind = kind; } public String toJson() { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java new file mode 100644 index 0000000..77e288b --- /dev/null +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySharedInterpreter.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.livy; + +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Livy Interpreter for shared kind which share SparkContext across spark/pyspark/r + */ +public class LivySharedInterpreter extends BaseLivyInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(LivySharedInterpreter.class); + + private boolean isSupported = false; + + public LivySharedInterpreter(Properties property) { + super(property); + } + + @Override + public void open() throws InterpreterException { + try { + // check livy version + try { + this.livyVersion = getLivyVersion(); + LOGGER.info("Use livy " + livyVersion); + } catch (APINotFoundException e) { + // assume it is livy 0.2.0 when livy doesn't support rest api of fetching version. + this.livyVersion = new LivyVersion("0.2.0"); + LOGGER.info("Use livy 0.2.0"); + } + + if (livyVersion.isSharedSupported()) { + LOGGER.info("LivySharedInterpreter is supported."); + isSupported = true; + initLivySession(); + } else { + LOGGER.info("LivySharedInterpreter is not supported."); + isSupported = false; + } + } catch (LivyException e) { + String msg = "Fail to create session, please check livy interpreter log and " + + "livy server log"; + throw new InterpreterException(msg, e); + } + } + + public boolean isSupported() { + return isSupported; + } + + public InterpreterResult interpret(String st, String codeType, InterpreterContext context) { + if (StringUtils.isEmpty(st)) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, ""); + } + + try { + return interpret(st, codeType, context.getParagraphId(), this.displayAppInfo, true); + } catch (LivyException e) { + LOGGER.error("Fail to interpret:" + st, e); + return new InterpreterResult(InterpreterResult.Code.ERROR, + InterpreterUtils.getMostRelevantMessage(e)); + } + } + + @Override + public String getSessionKind() { + return "shared"; + } + + @Override + protected String extractAppId() throws LivyException { + return null; + } + + @Override + protected String extractWebUIAddress() throws LivyException { + return null; + } + + public static void main(String[] args) { + ExecuteRequest request = new ExecuteRequest("1+1", null); + System.out.println(request.toJson()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 606ef64..066d0da 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -44,6 +44,7 @@ public class LivySparkInterpreter extends BaseLivyInterpreter { protected String extractWebUIAddress() throws LivyException { interpret( "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", + null, null, false, false); return extractStatementResult( interpret( http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 7b2d7d6..2faa350 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -19,11 +19,14 @@ package org.apache.zeppelin.livy; import org.apache.commons.lang.StringUtils; import static org.apache.commons.lang.StringEscapeUtils.escapeJavaScript; +import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.apache.zeppelin.user.AuthenticationInfo; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Properties; @@ -39,6 +42,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { "zeppelin.livy.spark.sql.maxResult"; private LivySparkInterpreter sparkInterpreter; + private String codeType = null; private boolean isSpark2 = false; private int maxResult = 1000; @@ -64,7 +68,21 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession // to judge whether it is using spark2. try { - InterpreterResult result = sparkInterpreter.interpret("spark", null, false, false); + InterpreterContext context = new InterpreterContext( + "noteId", + "paragraphId", + "replName", + "paragraphTitle", + "paragraphText", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new GUI(), + null, + null, + null, + new InterpreterOutput(null)); + InterpreterResult result = sparkInterpreter.interpret("spark", context); if (result.code() == InterpreterResult.Code.SUCCESS && result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) { LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}", @@ -72,7 +90,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { isSpark2 = true; } else { // spark 1.x - result = sparkInterpreter.interpret("sqlContext", null, false, false); + result = sparkInterpreter.interpret("sqlContext", context); if (result.code() == InterpreterResult.Code.SUCCESS) { LOGGER.info("sqlContext is detected."); } else if (result.code() == InterpreterResult.Code.ERROR) { @@ -81,7 +99,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves"); result = sparkInterpreter.interpret( "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" - + "import sqlContext.implicits._", null, false, false); + + "import sqlContext.implicits._", context); if (result.code() == InterpreterResult.Code.ERROR) { throw new LivyException("Fail to create SQLContext," + result.message().get(0).getData()); @@ -128,9 +146,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + truncate + ")"; } - InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(), - this.displayAppInfo, true); - + InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context); if (result.code() == InterpreterResult.Code.SUCCESS) { InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS); for (InterpreterResultMessage message : result.message()) { @@ -248,12 +264,16 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { @Override public void cancel(InterpreterContext context) { - sparkInterpreter.cancel(context); + if (this.sparkInterpreter != null) { + sparkInterpreter.cancel(context); + } } @Override public void close() { - this.sparkInterpreter.close(); + if (this.sparkInterpreter != null) { + this.sparkInterpreter.close(); + } } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java index 7cfecfb..81bb8d4 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java @@ -29,6 +29,7 @@ public class LivyVersion { protected static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0"); protected static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0"); protected static final LivyVersion LIVY_0_4_0 = LivyVersion.fromVersionString("0.4.0"); + protected static final LivyVersion LIVY_0_5_0 = LivyVersion.fromVersionString("0.5.0"); private int version; private String versionString; @@ -79,6 +80,10 @@ public class LivyVersion { return this.newerThanEquals(LIVY_0_4_0); } + public boolean isSharedSupported() { + return this.newerThanEquals(LIVY_0_5_0); + } + public boolean equals(Object versionToCompare) { return version == ((LivyVersion) versionToCompare).version; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json index 2d72487..cecacac 100644 --- a/livy/src/main/resources/interpreter-setting.json +++ b/livy/src/main/resources/interpreter-setting.json @@ -227,5 +227,21 @@ "editOnDblClick": false, "completionKey": "TAB" } + }, + { + "group": "livy", + "name": "shared", + "className": "org.apache.zeppelin.livy.LivySharedInterpreter", + "properties": { + }, + "option": { + "remote": true, + "port": -1, + "perNote": "shared", + "perUser": "scoped", + "isExistingProcess": false, + "setPermission": false, + "users": [] + } } ] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ad6e6918/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index ef3eabe..3dfeb36 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -34,6 +34,7 @@ import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; public class LivyInterpreterIT { @@ -76,7 +77,7 @@ public class LivyInterpreterIT { } -// @Test + @Test public void testSparkInterpreterRDD() throws InterpreterException { if (!checkPreCondition()) { return; @@ -197,7 +198,7 @@ public class LivyInterpreterIT { } -// @Test + @Test public void testSparkInterpreterDataFrame() throws InterpreterException { if (!checkPreCondition()) { return; @@ -285,7 +286,7 @@ public class LivyInterpreterIT { } } -// @Test + @Test public void testSparkSQLInterpreter() throws InterpreterException { if (!checkPreCondition()) { return; @@ -320,7 +321,7 @@ public class LivyInterpreterIT { } -// @Test + @Test public void testSparkSQLCancellation() throws InterpreterException { if (!checkPreCondition()) { return; @@ -401,7 +402,7 @@ public class LivyInterpreterIT { } } -// @Test + @Test public void testStringWithTruncation() throws InterpreterException { if (!checkPreCondition()) { return; @@ -462,7 +463,7 @@ public class LivyInterpreterIT { } -// @Test + @Test public void testStringWithoutTruncation() throws InterpreterException { if (!checkPreCondition()) { return; @@ -534,6 +535,7 @@ public class LivyInterpreterIT { } final LivyPySparkInterpreter pysparkInterpreter = new LivyPySparkInterpreter(properties); + pysparkInterpreter.setInterpreterGroup(mock(InterpreterGroup.class)); AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); @@ -647,7 +649,7 @@ public class LivyInterpreterIT { } } -// @Test + @Test public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException { if (!checkPreCondition()) { return; @@ -686,13 +688,15 @@ public class LivyInterpreterIT { } } -// @Test + @Test public void testSparkRInterpreter() throws LivyException, InterpreterException { if (!checkPreCondition()) { return; } final LivySparkRInterpreter sparkRInterpreter = new LivySparkRInterpreter(properties); + sparkRInterpreter.setInterpreterGroup(mock(InterpreterGroup.class)); + try { sparkRInterpreter.getLivyVersion(); } catch (APINotFoundException e) { @@ -749,8 +753,7 @@ public class LivyInterpreterIT { // error result = sparkRInterpreter.interpret("cat(a)", context); - //TODO @zjffdu, it should be ERROR, it is due to bug of LIVY-313 - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); assertTrue(result.message().get(0).getData().contains("object 'a' not found")); } finally { @@ -758,7 +761,7 @@ public class LivyInterpreterIT { } } -// @Test + @Test public void testLivyTutorialNote() throws IOException, InterpreterException { if (!checkPreCondition()) { return; @@ -796,6 +799,114 @@ public class LivyInterpreterIT { } } + @Test + public void testSharedInterpreter() throws InterpreterException { + if (!checkPreCondition()) { + return; + } + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); + interpreterGroup.put("session_1", new ArrayList<Interpreter>()); + LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter( + new LivySparkInterpreter(properties)); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + interpreterGroup.get("session_1").add(sparkInterpreter); + + LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter( + new LivySparkSQLInterpreter(properties)); + interpreterGroup.get("session_1").add(sqlInterpreter); + sqlInterpreter.setInterpreterGroup(interpreterGroup); + + LazyOpenInterpreter pysparkInterpreter = new LazyOpenInterpreter( + new LivyPySparkInterpreter(properties)); + interpreterGroup.get("session_1").add(pysparkInterpreter); + pysparkInterpreter.setInterpreterGroup(interpreterGroup); + + LazyOpenInterpreter sparkRInterpreter = new LazyOpenInterpreter( + new LivySparkRInterpreter(properties)); + interpreterGroup.get("session_1").add(sparkRInterpreter); + sparkRInterpreter.setInterpreterGroup(interpreterGroup); + + LazyOpenInterpreter sharedInterpreter = new LazyOpenInterpreter( + new LivySharedInterpreter(properties)); + interpreterGroup.get("session_1").add(sharedInterpreter); + sharedInterpreter.setInterpreterGroup(interpreterGroup); + + sparkInterpreter.open(); + sqlInterpreter.open(); + pysparkInterpreter.open(); + sparkRInterpreter.open(); + + try { + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql", + "title", "text", authInfo, null, null, null, null, null, null, output); + // detect spark version + InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + boolean isSpark2 = isSpark2((BaseLivyInterpreter)sparkInterpreter.getInnerInterpreter(), context); + + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + + // access table from pyspark + result = pysparkInterpreter.interpret("sqlContext.sql(\"select * from df\").show()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("+-----+-----+\n" + + "|col_1|col_2|\n" + + "+-----+-----+\n" + + "|hello| 20|\n" + + "+-----+-----+")); + + // access table from sparkr + result = sparkRInterpreter.interpret("head(sql(sqlContext, \"select * from df\"))", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("col_1 col_2\n1 hello 20")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + + // access table from pyspark + result = pysparkInterpreter.interpret("spark.sql(\"select * from df\").show()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("+-----+-----+\n" + + "|col_1|col_2|\n" + + "+-----+-----+\n" + + "|hello| 20|\n" + + "+-----+-----+")); + + // access table from sparkr + result = sparkRInterpreter.interpret("head(sql(\"select * from df\"))", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("col_1 col_2\n1 hello 20")); + } + } finally { + sparkInterpreter.close(); + sqlInterpreter.close(); + } + } private boolean isSpark2(BaseLivyInterpreter interpreter, InterpreterContext context) { InterpreterResult result = null;