Repository: zeppelin Updated Branches: refs/heads/master 483a89705 -> 8546666d5
[ZEPPELIN-759] Spark 2.0 support ### What is this PR for? This PR implement spark 2.0 support based on #747. This PR has approach from #980 which is reimplementing code in scala. You can try build this branch ``` mvn clean package -Dscala-2.11 -Pspark-2.0 -Dspark.version=2.0.0-preview -Ppyspark -Psparkr -Pyarn -Phadoop-2.6 -DskipTests ``` ### What type of PR is it? Improvements ### Todos * [x] - Spark 2.0 support * [x] - Rebase after #747 merge * [x] - Update LICENSE file * [x] - Update related document (build) ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-759 ### How should this be tested? Build and try ``` mvn clean package -Dscala-2.11 -Pspark-2.0 -Dspark.version=2.0.0-preview -Ppyspark -Psparkr -Pyarn -Phadoop-2.6 -DskipTests ``` ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? yes * Is there breaking changes for older versions? no * Does this needs documentation? yes Author: Lee moon soo <[email protected]> Closes #1195 from Leemoonsoo/spark-20 and squashes the following commits: d78b322 [Lee moon soo] trigger ci 8017e8b [Lee moon soo] Remove unnecessary spark.version property e3141bd [Lee moon soo] restart sparkcluster before sparkr test 1493b2c [Lee moon soo] print spark standalone cluster log when ci test fails a208cd0 [Lee moon soo] Debug sparkRTest 31369c6 [Lee moon soo] Update license 293896a [Lee moon soo] Update build instruction 862ff6c [Lee moon soo] Make ZeppelinSparkClusterTest.java work with spark 2 839912a [Lee moon soo] Update SPARK_HOME directory detection pattern for 2.0.0-preview in the test 3413707 [Lee moon soo] Update .travis.yml 02bcd5d [Lee moon soo] Update SparkSqlInterpreterTest f06a2fa [Lee moon soo] Spark 2.0 support Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/8546666d Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/8546666d Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/8546666d Branch: refs/heads/master Commit: 8546666d5d7069f9a8d18664093362edb0d94a5a Parents: 483a897 Author: Lee moon soo <[email protected]> Authored: Sat Jul 23 08:03:58 2016 +0900 Committer: Lee moon soo <[email protected]> Committed: Sun Jul 24 09:10:28 2016 +0900 ---------------------------------------------------------------------- .travis.yml | 10 +- README.md | 20 +- docs/install/install.md | 7 +- r/pom.xml | 2 - spark-dependencies/pom.xml | 40 +-- .../zeppelin/spark/PySparkInterpreter.java | 9 + .../apache/zeppelin/spark/SparkInterpreter.java | 247 +++++++++++++++---- .../java/org/apache/zeppelin/spark/Utils.java | 9 + .../apache/zeppelin/spark/ZeppelinContext.java | 2 - .../main/resources/python/zeppelin_pyspark.py | 15 +- .../zeppelin/spark/SparkInterpreterTest.java | 12 + .../zeppelin/spark/SparkSqlInterpreterTest.java | 6 +- zeppelin-distribution/src/bin_license/LICENSE | 54 ++-- .../zeppelin/rest/AbstractTestRestApi.java | 21 +- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 33 ++- 15 files changed, 386 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index d4adc0d..1d25a67 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,13 +33,17 @@ addons: matrix: include: + # Test all modules with spark-2.0.0-preview and scala 2.11 + - jdk: "oraclejdk7" + env: SCALA_VER="2.11" SPARK_VER="2.0.0-preview" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Dspark.version=2.0.0-preview -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" + # Test all modules with scala 2.10 - jdk: "oraclejdk7" - env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Dscala-2.10 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" + env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" # Test all modules with scala 2.11 - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Dscala-2.11 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" + env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="" # Test spark module for 1.5.2 - jdk: "oraclejdk7" @@ -96,6 +100,8 @@ after_failure: - cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.log - cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.out - cat zeppelin-web/npm-debug.log + - cat spark-*/logs/* after_script: - ./testing/stopSparkCluster.sh $SPARK_VER $HADOOP_VER + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index a90faca..d702467 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,7 @@ Set spark major version Available profiles are ``` +-Pspark-2.0 -Pspark-1.6 -Pspark-1.5 -Pspark-1.4 @@ -157,6 +158,16 @@ Available profiles are minor version can be adjusted by `-Dhadoop.version=x.x.x` +##### `-Pscala-[version] (optional)` + +set scala version (default 2.10) +Available profiles are + +``` +-Pscala-2.10 +-Pscala-2.11 +``` + ##### `-Pyarn` (optional) enable YARN support for local mode @@ -199,14 +210,17 @@ Available profiles are Bulid examples under zeppelin-examples directory - +#### Example Here're some examples: ```sh -# basic build -mvn clean package -Pspark-1.6 -Phadoop-2.4 -Pyarn -Ppyspark +# build with spark-2.0, scala-2.11 +mvn clean package -Pspark-2.0 -Phadoop-2.4 -Pyarn -Ppyspark -Psparkr -Pscala-2.11 + +# build with spark-1.6, scala-2.10 +mvn clean package -Pspark-1.6 -Phadoop-2.4 -Pyarn -Ppyspark -Psparkr # spark-cassandra integration mvn clean package -Pcassandra-spark-1.5 -Dhadoop.version=2.6.0 -Phadoop-2.6 -DskipTests http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/docs/install/install.md ---------------------------------------------------------------------- diff --git a/docs/install/install.md b/docs/install/install.md index bcc93da..98adc74 100644 --- a/docs/install/install.md +++ b/docs/install/install.md @@ -93,8 +93,11 @@ mvn clean package -DskipTests [Options] Here are some examples with several options ``` -# basic build -mvn clean package -Pspark-1.6 -Phadoop-2.4 -Pyarn -Ppyspark +# build with spark-2.0, scala-2.11 +mvn clean package -Pspark-2.0 -Phadoop-2.4 -Pyarn -Ppyspark -Psparkr -Pscala-2.11 + +# build with spark-1.6, scala-2.10 +mvn clean package -Pspark-1.6 -Phadoop-2.4 -Pyarn -Ppyspark -Psparkr # spark-cassandra integration mvn clean package -Pcassandra-spark-1.5 -Dhadoop.version=2.6.0 -Phadoop-2.6 -DskipTests http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/r/pom.xml ---------------------------------------------------------------------- diff --git a/r/pom.xml b/r/pom.xml index 1b71b7e..0e7d78d 100644 --- a/r/pom.xml +++ b/r/pom.xml @@ -383,7 +383,6 @@ <property><name>!scala-2.11</name></property> </activation> <properties> - <spark.version>1.6.1</spark.version> <extra.source.dir>src/main/scala-2.10</extra.source.dir> <extra.testsource.dir>src/test/scala-2.10</extra.testsource.dir> </properties> @@ -395,7 +394,6 @@ <property><name>scala-2.11</name></property> </activation> <properties> - <spark.version>1.6.1</spark.version> <extra.source.dir>src/main/scala-2.11</extra.source.dir> <extra.testsource.dir>src/test/scala/scala-2.11</extra.testsource.dir> </properties> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml index 754935d..5c3eb08 100644 --- a/spark-dependencies/pom.xml +++ b/spark-dependencies/pom.xml @@ -36,8 +36,16 @@ <url>http://zeppelin.apache.org</url> <properties> - <spark.version>1.4.1</spark.version> + <!-- library version defined in this section brought from spark 1.4.1 and it's dependency. + Therefore changing only spark.version is not going to be enough when this module + support new version of spark to make the new version as default supported version. + + Each profile (spark-2.0, spark-1.6, etc) will overrides necessary dependency version. + So we'll make one of those profile 'activateByDefault' to make it default supported version + instead of changing spark.version in this section. + --> + <spark.version>1.4.1</spark.version> <hadoop.version>2.3.0</hadoop.version> <yarn.version>${hadoop.version}</yarn.version> <avro.version>1.7.7</avro.version> @@ -285,12 +293,6 @@ <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> <artifactId>spark-catalyst_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> @@ -345,14 +347,6 @@ <profiles> <profile> - <id>scala-2.11</id> - <properties> - <spark.version>1.6.1</spark.version> - <spark.download.url>http://archive.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz</spark.download.url> - </properties> - </profile> - - <profile> <id>spark-1.1</id> <dependencies> @@ -517,9 +511,6 @@ <profile> <id>spark-1.6</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> <properties> <spark.version>1.6.1</spark.version> <py4j.version>0.9</py4j.version> @@ -530,6 +521,19 @@ </profile> <profile> + <id>spark-2.0</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <spark.version>2.0.0</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.1</py4j.version> + <scala.version>2.11.8</scala.version> + </properties> + </profile> + + <profile> <id>hadoop-0.23</id> <!-- SPARK-1121: Adds an explicit dependency on Avro to work around a Hadoop 0.23.X issue --> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 98fb834..f63f3d4 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -528,6 +528,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } + public Object getSparkSession() { + SparkInterpreter intp = getSparkInterpreter(); + if (intp == null) { + return null; + } else { + return intp.getSparkSession(); + } + } + public SparkConf getSparkConf() { JavaSparkContext sc = getJavaSparkContext(); if (sc == null) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 8a7e4c9..4707611 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -32,9 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Joiner; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import org.apache.spark.HttpServer; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; @@ -99,9 +96,11 @@ public class SparkInterpreter extends Interpreter { * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11) */ private Object intp; + private SparkConf conf; private static SparkContext sc; private static SQLContext sqlc; private static SparkEnv env; + private static Object sparkSession; // spark 2.x private static JobProgressListener sparkListener; private static AbstractFile classOutputDir; private static Integer sharedInterpreterLock = new Integer(0); @@ -118,7 +117,7 @@ public class SparkInterpreter extends Interpreter { private Map<String, Object> binder; private SparkVersion sparkVersion; private File outputDir; // class outputdir for scala 2.11 - private HttpServer classServer; // classserver for scala 2.11 + private Object classServer; // classserver for scala 2.11 public SparkInterpreter(Properties property) { @@ -194,35 +193,80 @@ public class SparkInterpreter extends Interpreter { return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); } + /** + * See org.apache.spark.sql.SparkSession.hiveClassesArePresent + * @return + */ + private boolean hiveClassesArePresent() { + try { + this.getClass().forName("org.apache.spark.sql.hive.HiveSessionState"); + this.getClass().forName("org.apache.spark.sql.hive.HiveSharedState"); + this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf"); + return true; + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return false; + } + } + private boolean importImplicit() { return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit")); } + public Object getSparkSession() { + synchronized (sharedInterpreterLock) { + if (sparkSession == null) { + createSparkSession(); + } + return sparkSession; + } + } + public SQLContext getSQLContext() { synchronized (sharedInterpreterLock) { + if (Utils.isSpark2()) { + return getSQLContext_2(); + } else { + return getSQLContext_1(); + } + } + } + + /** + * Get SQLContext for spark 2.x + */ + private SQLContext getSQLContext_2() { + if (sqlc == null) { + sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "wrapped"); if (sqlc == null) { - if (useHiveContext()) { - String name = "org.apache.spark.sql.hive.HiveContext"; - Constructor<?> hc; - try { - hc = getClass().getClassLoader().loadClass(name) - .getConstructor(SparkContext.class); - sqlc = (SQLContext) hc.newInstance(getSparkContext()); - } catch (NoSuchMethodException | SecurityException - | ClassNotFoundException | InstantiationException - | IllegalAccessException | IllegalArgumentException - | InvocationTargetException e) { - logger.warn("Can't create HiveContext. Fallback to SQLContext", e); - // when hive dependency is not loaded, it'll fail. - // in this case SQLContext can be used. - sqlc = new SQLContext(getSparkContext()); - } - } else { + sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext"); + } + } + return sqlc; + } + + public SQLContext getSQLContext_1() { + if (sqlc == null) { + if (useHiveContext()) { + String name = "org.apache.spark.sql.hive.HiveContext"; + Constructor<?> hc; + try { + hc = getClass().getClassLoader().loadClass(name) + .getConstructor(SparkContext.class); + sqlc = (SQLContext) hc.newInstance(getSparkContext()); + } catch (NoSuchMethodException | SecurityException + | ClassNotFoundException | InstantiationException + | IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + logger.warn("Can't create HiveContext. Fallback to SQLContext", e); + // when hive dependency is not loaded, it'll fail. + // in this case SQLContext can be used. sqlc = new SQLContext(getSparkContext()); } + } else { + sqlc = new SQLContext(getSparkContext()); } - return sqlc; } + return sqlc; } @@ -250,7 +294,80 @@ public class SparkInterpreter extends Interpreter { return (DepInterpreter) p; } + /** + * Spark 2.x + * Create SparkSession + */ + public Object createSparkSession() { + logger.info("------ Create new SparkContext {} -------", getProperty("master")); + String execUri = System.getenv("SPARK_EXECUTOR_URI"); + conf.setAppName(getProperty("spark.app.name")); + + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()); + + if (execUri != null) { + conf.set("spark.executor.uri", execUri); + } + + if (System.getenv("SPARK_HOME") != null) { + conf.setSparkHome(System.getenv("SPARK_HOME")); + } + + conf.set("spark.scheduler.mode", "FAIR"); + conf.setMaster(getProperty("master")); + + Properties intpProperty = getProperty(); + + for (Object k : intpProperty.keySet()) { + String key = (String) k; + String val = toString(intpProperty.get(key)); + if (!key.startsWith("spark.") || !val.trim().isEmpty()) { + logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val)); + conf.set(key, val); + } + } + + Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession"); + Object builder = Utils.invokeStaticMethod(SparkSession, "builder"); + Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf }); + + if (useHiveContext()) { + if (hiveClassesArePresent()) { + Utils.invokeMethod(builder, "enableHiveSupport"); + sparkSession = Utils.invokeMethod(builder, "getOrCreate"); + logger.info("Created Spark session with Hive support"); + } else { + Utils.invokeMethod(builder, "config", + new Class[]{ String.class, String.class}, + new Object[]{ "spark.sql.catalogImplementation", "in-memory"}); + sparkSession = Utils.invokeMethod(builder, "getOrCreate"); + logger.info("Created Spark session with Hive support"); + } + } else { + sparkSession = Utils.invokeMethod(builder, "getOrCreate"); + logger.info("Created Spark session"); + } + + return sparkSession; + } + public SparkContext createSparkContext() { + if (Utils.isSpark2()) { + return createSparkContext_2(); + } else { + return createSparkContext_1(); + } + } + + /** + * Create SparkContext for spark 2.x + * @return + */ + private SparkContext createSparkContext_2() { + return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext"); + } + + public SparkContext createSparkContext_1() { logger.info("------ Create new SparkContext {} -------", getProperty("master")); String execUri = System.getenv("SPARK_EXECUTOR_URI"); @@ -267,8 +384,8 @@ public class SparkInterpreter extends Interpreter { try { // in case of spark 1.1x, spark 1.2x Method classServer = intp.getClass().getMethod("classServer"); - HttpServer httpServer = (HttpServer) classServer.invoke(intp); - classServerUri = httpServer.uri(); + Object httpServer = classServer.invoke(intp); + classServerUri = (String) Utils.invokeMethod(httpServer, "uri"); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { // continue @@ -290,14 +407,12 @@ public class SparkInterpreter extends Interpreter { if (Utils.isScala2_11()) { classServer = createHttpServer(outputDir); - classServer.start(); - classServerUri = classServer.uri(); + Utils.invokeMethod(classServer, "start"); + classServerUri = (String) Utils.invokeMethod(classServer, "uri"); } - SparkConf conf = - new SparkConf() - .setMaster(getProperty("master")) - .setAppName(getProperty("spark.app.name")); + conf.setMaster(getProperty("master")) + .setAppName(getProperty("spark.app.name")); if (classServerUri != null) { conf.set("spark.repl.class.uri", classServerUri); @@ -409,6 +524,7 @@ public class SparkInterpreter extends Interpreter { @Override public void open() { + conf = new SparkConf(); URL[] urls = getClassloaderUrls(); // Very nice discussion about how scala compiler handle classpath @@ -535,7 +651,19 @@ public class SparkInterpreter extends Interpreter { b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$"); + /* Required for scoped mode. + * In scoped mode multiple scala compiler (repl) generates class in the same directory. + * Class names is not randomly generated and look like '$line12.$read$$iw$$iw' + * Therefore it's possible to generated class conflict(overwrite) with other repl generated + * class. + * + * To prevent generated class name conflict, + * change prefix of generated class name from each scala compiler (repl) instance. + * + * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern + * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$ + */ + System.setProperty("scala.repl.name.line", "$line" + this.hashCode()); // To prevent 'File name too long' error on some file system. MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName(); @@ -582,6 +710,9 @@ public class SparkInterpreter extends Interpreter { new Object[]{intp}); } + if (Utils.isSpark2()) { + sparkSession = getSparkSession(); + } sc = getSparkContext(); if (sc.getPoolForName("fair").isEmpty()) { Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR(); @@ -611,6 +742,10 @@ public class SparkInterpreter extends Interpreter { binder.put("sqlc", sqlc); binder.put("z", z); + if (Utils.isSpark2()) { + binder.put("spark", sparkSession); + } + interpret("@transient val z = " + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]"); interpret("@transient val sc = " @@ -619,15 +754,27 @@ public class SparkInterpreter extends Interpreter { + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); interpret("@transient val sqlContext = " + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); + + if (Utils.isSpark2()) { + interpret("@transient val spark = " + + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]"); + } + interpret("import org.apache.spark.SparkContext._"); if (importImplicit()) { - if (sparkVersion.oldSqlContextImplicits()) { - interpret("import sqlContext._"); - } else { - interpret("import sqlContext.implicits._"); - interpret("import sqlContext.sql"); + if (Utils.isSpark2()) { + interpret("import spark.implicits._"); + interpret("import spark.sql"); interpret("import org.apache.spark.sql.functions._"); + } else { + if (sparkVersion.oldSqlContextImplicits()) { + interpret("import sqlContext._"); + } else { + interpret("import sqlContext.implicits._"); + interpret("import sqlContext.sql"); + interpret("import org.apache.spark.sql.functions._"); + } } } } @@ -825,6 +972,9 @@ public class SparkInterpreter extends Interpreter { public Object getLastObject() { IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest"); + if (r == null || r.lineRep() == null) { + return null; + } Object obj = r.lineRep().call("$result", JavaConversions.asScalaBuffer(new LinkedList<Object>())); return obj; @@ -955,7 +1105,18 @@ public class SparkInterpreter extends Interpreter { return; } - Object lastObj = getValue(varName); + Object lastObj = null; + try { + if (Utils.isScala2_10()) { + lastObj = getValue(varName); + } else { + lastObj = getLastObject(); + } + } catch (NullPointerException e) { + // Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE + logger.error(e.getMessage(), e); + } + if (lastObj != null) { ResourcePool resourcePool = context.getResourcePool(); resourcePool.put(context.getNoteId(), context.getParagraphId(), @@ -1100,7 +1261,7 @@ public class SparkInterpreter extends Interpreter { sc.stop(); sc = null; if (classServer != null) { - classServer.stop(); + Utils.invokeMethod(classServer, "stop"); classServer = null; } } @@ -1153,16 +1314,16 @@ public class SparkInterpreter extends Interpreter { return file; } - private HttpServer createHttpServer(File outputDir) { + private Object createHttpServer(File outputDir) { SparkConf conf = new SparkConf(); try { // try to create HttpServer Constructor<?> constructor = getClass().getClassLoader() - .loadClass(HttpServer.class.getName()) + .loadClass("org.apache.spark.HttpServer") .getConstructor(new Class[]{ SparkConf.class, File.class, SecurityManager.class, int.class, String.class}); - return (HttpServer) constructor.newInstance(new Object[] { + return constructor.newInstance(new Object[] { conf, outputDir, new SecurityManager(conf), 0, "HTTP Server"}); } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { @@ -1170,10 +1331,10 @@ public class SparkInterpreter extends Interpreter { Constructor<?> constructor = null; try { constructor = getClass().getClassLoader() - .loadClass(HttpServer.class.getName()) + .loadClass("org.apache.spark.HttpServer") .getConstructor(new Class[]{ File.class, SecurityManager.class, int.class, String.class}); - return (HttpServer) constructor.newInstance(new Object[] { + return constructor.newInstance(new Object[] { outputDir, new SecurityManager(conf), 0, "HTTP Server"}); } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e1) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark/src/main/java/org/apache/zeppelin/spark/Utils.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java index 940e202..328fa19 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -89,4 +89,13 @@ class Utils { static boolean isScala2_11() { return !isScala2_10(); } + + static boolean isSpark2() { + try { + Utils.class.forName("org.apache.spark.sql.SparkSession"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index bd8f0a1..bfd2b46 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -31,7 +31,6 @@ import java.util.List; import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.catalyst.expressions.Attribute; -import org.apache.spark.sql.hive.HiveContext; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; @@ -70,7 +69,6 @@ public class ZeppelinContext { public SparkContext sc; public SQLContext sqlContext; - public HiveContext hiveContext; private GUI gui; @ZeppelinApi http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 0ea5474..0380afa 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -29,7 +29,7 @@ from pyspark.broadcast import Broadcast from pyspark.serializers import MarshalSerializer, PickleSerializer # for back compatibility -from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row +from pyspark.sql import SQLContext, HiveContext, Row class Logger(object): def __init__(self): @@ -107,6 +107,7 @@ class PyZeppelinContext(dict): class SparkVersion(object): SPARK_1_4_0 = 140 SPARK_1_3_0 = 130 + SPARK_2_0_0 = 200 def __init__(self, versionNumber): self.version = versionNumber @@ -117,6 +118,9 @@ class SparkVersion(object): def isImportAllPackageUnderSparkSql(self): return self.version >= self.SPARK_1_3_0 + def isSpark2(self): + return self.version >= self.SPARK_2_0_0 + class PySparkCompletion: def __init__(self, interpreterObject): self.interpreterObject = interpreterObject @@ -175,6 +179,12 @@ sys.stderr = output client = GatewayClient(port=int(sys.argv[1])) sparkVersion = SparkVersion(int(sys.argv[2])) +if sparkVersion.isSpark2(): + from pyspark.sql import SparkSession +else: + from pyspark.sql import SchemaRDD + + if sparkVersion.isAutoConvertEnabled(): gateway = JavaGateway(client, auto_convert = True) else: @@ -209,6 +219,9 @@ sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) sqlc = SQLContext(sc, intp.getSQLContext()) sqlContext = sqlc +if sparkVersion.isSpark2(): + spark = SparkSession(sc, intp.getSparkSession()) + completion = PySparkCompletion(intp) z = PyZeppelinContext(intp.getZeppelinContext()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 815e77f..88208ab 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -30,6 +30,7 @@ import org.apache.spark.SparkContext; import org.apache.spark.repl.SparkILoop; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.resource.WellKnownResourceName; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; @@ -177,6 +178,17 @@ public class SparkInterpreterTest { } @Test + public void testCreateDataFrame() { + repl.interpret("case class Person(name:String, age:Int)\n", context); + repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); + repl.interpret("people.toDF.count", context); + assertEquals(new Long(4), context.getResourcePool().get( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ZeppelinReplResult.toString()).get()); + } + + @Test public void testSparkSql(){ repl.interpret("case class Person(name:String, age:Int)\n", context); repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 73d5e8a..badd040 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -159,11 +159,13 @@ public class SparkSqlInterpreterTest { repl.interpret( "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context); - repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", - context); if (isDataFrameSupported()) { + repl.interpret("val people = z.sqlContext.createDataFrame(raw, schema)", + context); repl.interpret("people.toDF.registerTempTable(\"people\")", context); } else { + repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", + context); repl.interpret("people.registerTempTable(\"people\")", context); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/zeppelin-distribution/src/bin_license/LICENSE ---------------------------------------------------------------------- diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index ba567ad..52bd82f 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -30,7 +30,7 @@ The following components are provided under Apache License. (Apache 2.0) Apache Maven scm api (org.apache.maven.scm:maven-scm-api:jar:1.4 - https://maven.apache.org/scm/) (Apache 2.0) Apache Maven scm provider svnexe (org.apache.maven.scm:maven-scm-provider-svnexe:jar:1.4 - http://maven.apache.org/scm/maven-scm-providers/maven-scm-providers-svn/maven-scm-provider-svnexe/index.html/) (Apache 2.0) Apache Maven scm provider common (org.apache.maven.scm:maven-scm-provider-svn-commons:jar:1.4 - http://maven.apache.org/scm/maven-scm-providers/maven-scm-providers-svn/maven-scm-provider-svn-commons/) - (Apache 2.0) Apache Spark (org.apache.spark:spark:1.5.1) - http://spark.apache.org + (Apache 2.0) Apache Spark (http://spark.apache.org) (Apache 2.0) Apache Hadoop (http://hadoop.apache.org) (Apache 2.0) Apache Avro (org.apache.avro:avro:1.7.7 - http://avro.apache.org) (Apache 2.0) Apache Curator (org.apache.curator:curator:2.4.0 - http://curator.apache.org/) @@ -44,14 +44,14 @@ The following components are provided under Apache License. (Apache 2.0) Apache Thrift (http://thrift.apache.org/) (Apache 2.0) Apache Lucene (https://lucene.apache.org/) (Apache 2.0) Apache Zookeeper (org.apache.zookeeper:zookeeper:jar:3.4.5 - http://zookeeper.apache.org/) - (Apache 2.0) Chill (com.twitter:chill-java:jar:0.5.0 - https://github.com/twitter/chill/) + (Apache 2.0) Chill (com.twitter:chill-java:jar:0.8.0 - https://github.com/twitter/chill/) (Apache 2.0) Codehaus Plexus (org.codehaus.plexus:plexus:jar:1.5.6 - https://codehaus-plexus.github.io/) (Apache 2.0) findbugs jsr305 (com.google.code.findbugs:jsr305:jar:1.3.9 - http://findbugs.sourceforge.net/) (Apache 2.0) Google Guava (com.google.guava:guava:15.0 - https://code.google.com/p/guava-libraries/) (Apache 2.0) Jackson (com.fasterxml.jackson.core:jackson-core:2.5.3 - https://github.com/FasterXML/jackson-core) - (Apache 2.0) Jackson (com.fasterxml.jackson.core:jackson-annotations:2.5.0 - https://github.com/FasterXML/jackson-core) + (Apache 2.0) Jackson (com.fasterxml.jackson.core:jackson-annotations:2.5.3 - https://github.com/FasterXML/jackson-core) (Apache 2.0) Jackson (com.fasterxml.jackson.core:jackson-databind:2.5.3 - https://github.com/FasterXML/jackson-core) - (Apache 2.0) javax.servlet (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 - http://www.eclipse.org/jetty) + (Apache 2.0) javax.servlet (org.eclipse.jetty.orbit:javax.servlet:jar:3.1.0.v201112011016 - http://www.eclipse.org/jetty) (Apache 2.0) Joda-Time (joda-time:joda-time:2.8.1 - http://www.joda.org/joda-time/) (Apache 2.0) Jackson (org.codehaus.jackson:jackson-core-asl:1.9.13 - http://jackson.codehaus.org/) (Apache 2.0) Javassist (org.javassist:javassist:jar:3.18.1-GA:compile - http://jboss-javassist.github.io/javassist/) @@ -62,9 +62,9 @@ The following components are provided under Apache License. (Apache 2.0) xml apis (xml-apis:xml-apis:jar:1.4.01 - http://xerces.apache.org/xml-commons/components/external) (Apache 2.0) java-xmlbuilder (com.jamesmurty.utils:java-xmlbuilder:jar:1.0 - https://github.com/jmurty/java-xmlbuilder) (Apache 2.0) compress-lzf (com.ning:compress-lzf:jar:1.0.3 - https://github.com/ning/compress) Copyright 2009-2010 Ning, Inc. - (Apache 2.0) Snappy-java (org.xerial.snappy:snappy-java:1.1.1.7 - https://github.com/xerial/snappy-java/) + (Apache 2.0) Snappy-java (org.xerial.snappy:snappy-java:1.1.2.4 - https://github.com/xerial/snappy-java/) (Apache 2.0) lz4-java (net.jpountz.lz4:lz4:jar:1.3.0 - https://github.com/jpountz/lz4-java) - (Apache 2.0) RoaringBitmap (org.roaringbitmap:RoaringBitmap:jar:0.4.5 - https://github.com/lemire/RoaringBitmap) + (Apache 2.0) RoaringBitmap (org.roaringbitmap:RoaringBitmap:jar:0.5.11 - https://github.com/lemire/RoaringBitmap) (Apache 2.0) json4s (org.json4s:json4s-ast_2.10:jar:3.2.10 - https://github.com/json4s/json4s) (Apache 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc) (Apache 2.0) Jackson-dataformat-CBOR (com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.6.2 - http://wiki.fasterxml.com/JacksonForCbor) @@ -73,7 +73,7 @@ The following components are provided under Apache License. (Apache 2.0) json-flattener (com.github.wnameless:json-flattener:0.1.6 - https://github.com/wnameless/json-flattener) (Apache 2.0) Spatial4J (com.spatial4j:spatial4j:0.4.1 - https://github.com/spatial4j/spatial4j) (Apache 2.0) T-Digest (com.tdunning:t-digest:3.0 - https://github.com/tdunning/t-digest) - (Apache 2.0) Netty (io.netty:netty:3.10.5.Final - http://netty.io/) + (Apache 2.0) Netty (io.netty:netty:3.8.0.Final - http://netty.io/) (Apache 2.0) Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-analyzers-common) (Apache 2.0) Lucene Memory (org.apache.lucene:lucene-backward-codecs:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-backward-codecs) (Apache 2.0) Lucene Core (org.apache.lucene:lucene-core:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-core) @@ -93,13 +93,15 @@ The following components are provided under Apache License. (Apache 2.0) Shiro Core (org.apache.shiro:shiro-core:1.2.3 - https://shiro.apache.org) (Apache 2.0) Shiro Web (org.apache.shiro:shiro-web:1.2.3 - https://shiro.apache.org) (Apache 2.0) SnakeYAML (org.yaml:snakeyaml:1.15 - http://www.snakeyaml.org) - (Apache 2.0) Protocol Buffers (com.google.protobuf:protobuf-java:2.4.1 - https://github.com/google/protobuf/releases) + (Apache 2.0) Protocol Buffers (com.google.protobuf:protobuf-java:2.5.0 - https://github.com/google/protobuf/releases) (Apache 2.0) Alluxio Shell (org.alluxio:alluxio-shell:1.0.0 - http://alluxio.org) (Apache 2.0) Alluxio Servers (org.alluxio:alluxio-core-server:1.0.0 - http://alluxio.org) (Apache 2.0) Alluxio Minicluster (org.alluxio:alluxio-minicluster:1.0.0 - http://alluxio.org) (Apache 2.0) Alluxio Underfs Local (org.alluxio:alluxio-underfs-local:1.0.0 - http://alluxio.org) (Apache 2.0) Microsoft Azure Storage Library for Java (com.microsoft.azure:azure-storage:4.0.0 - https://github.com/Azure/azure-storage-java) (Apache 2.0) Roboto Font (https://github.com/google/roboto/) + (Apache 2.0) stream (com.clearspring.analytics:stream:2.7.0) - https://github.com/addthis/stream-lib/blob/v2.7.0/LICENSE.txt + (Apache 2.0) io.dropwizard.metrics:3.1.2 - https://github.com/dropwizard/metrics/blob/v3.1.2/LICENSE ======================================================================== @@ -141,10 +143,11 @@ The text of each license is also included at licenses/LICENSE-[project]-[version The following components are provided under the MIT License. (The MIT License) Objenesis (org.objenesis:objenesis:2.1 - https://github.com/easymock/objenesis) - Copyright (c) 2006-2015 the original author and authors - (The MIT License) JCL 1.1.1 implemented over SLF4J (org.slf4j:jcl-over-slf4j:1.7.5 - http://www.slf4j.org) - (The MIT License) JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.5 - http://www.slf4j.org) + (The MIT License) JCL 1.1.1 implemented over SLF4J (org.slf4j:jcl-over-slf4j:1.7.16 - http://www.slf4j.org) + (The MIT License) JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.16 - http://www.slf4j.org) (The MIT License) angular-resource (angular-resource - https://github.com/angular/angular.js/tree/master/src/ngResource) (The MIT License) minimal-json (com.eclipsesource.minimal-json:minimal-json:0.9.4 - https://github.com/ralfstx/minimal-json) + (The MIT License) pyrolite (net.razorvine:pyrolite:4.9) - https://github.com/irmen/Pyrolite/blob/v4.9/LICENSE ======================================================================== BSD-style licenses @@ -167,20 +170,31 @@ The text of each license is also included at licenses/LICENSE-[project]-[version The following components are provided under the BSD-style License. (New BSD License) JGit (org.eclipse.jgit:org.eclipse.jgit:jar:4.1.1.201511131810-r - https://eclipse.org/jgit/) - (New BSD License) Kryo (com.esotericsoftware.kryo:kryo:2.21 - http://code.google.com/p/kryo/) - (New BSD License) MinLog (com.esotericsoftware.minlog:minlog:1.2 - http://code.google.com/p/minlog/) + (New BSD License) Kryo (com.esotericsoftware.kryo:kryo:3.0.3 - http://code.google.com/p/kryo/) + (New BSD License) leveldbjni (org.fusesource.leveldbjni:leveldbjni-all:1.8) - https://github.com/fusesource/leveldbjni/blob/leveldbjni-1.8/license.txt + (New BSD License) MinLog (com.esotericsoftware.minlog:minlog:1.3 - http://code.google.com/p/minlog/) (New BSD License) ReflectASM (com.esotericsoftware.reflectasm:reflectasm:1.07 - http://code.google.com/p/reflectasm/) - (BSD-like) Scala Library (org.scala-lang:scala-library:2.10.4 - http://www.scala-lang.org/) - (BSD-like) Scalap (org.scala-lang:scalap:2.10.4 - http://www.scala-lang.org/) - (BSD-like) (The BSD License) jline (org.scala-lang:jline:2.10.4 - http://www.scala-lang.org/) - (BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.10.4 - http://www.scala-lang.org/) - (BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.10.4 - http://www.scala-lang.org/) - (BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.10.4 - http://www.scala-lang.org/) + (BSD-like) Scala Library (org.scala-lang:scala-library:2.11.7 - http://www.scala-lang.org/) + (BSD-like) Scalap (org.scala-lang:scalap:2.11.7 - http://www.scala-lang.org/) + (BSD-like) (The BSD License) jline (org.scala-lang:jline:2.11.7 - http://www.scala-lang.org/) + (BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.11.7 - http://www.scala-lang.org/) + (BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.11.7 - http://www.scala-lang.org/) + (BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.11.7 - http://www.scala-lang.org/) (BSD-like) ASM (asm:asm:jar:3.1 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom (New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/) + (New BSD License) Py4J (net.sf.py4j:py4j:0.10.1 - http://py4j.sourceforge.net/) - https://github.com/bartdag/py4j/blob/0.10.1/LICENSE.txt (New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/) - - + (BSD 3 Clause) Paranamer (com.thoughtworks.paranamer:paranamer:jar:2.6) - https://github.com/paul-hammant/paranamer/blob/paranamer-parent-2.6/LICENSE.txt + (BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core) + (BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model) + (BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/) + (BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/) + (BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org) + (BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org) + (BSD-style) spire (org.spire-math:spire_2.11:0.7.1 - http://spire-math.org) + (BSD-style) spire-macros (org.spire-math:spire-macros_2.11:0.7.1 - http://spire-math.org) + (The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net) + (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net) ======================================================================== CDDL license http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 6cfd961..58af796 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -26,6 +26,9 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethodBase; import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; @@ -223,7 +226,7 @@ public abstract class AbstractTestRestApi { } private static boolean isActiveSparkHome(File dir) { - if (dir.getName().matches("spark-[0-9\\.]+-bin-hadoop[0-9\\.]+")) { + if (dir.getName().matches("spark-[0-9\\.]+[A-Za-z-]*-bin-hadoop[0-9\\.]+")) { File pidDir = new File(dir, "run"); if (pidDir.isDirectory() && pidDir.listFiles().length > 0) { return true; @@ -415,6 +418,22 @@ public abstract class AbstractTestRestApi { }; } + + public static void ps() { + DefaultExecutor executor = new DefaultExecutor(); + executor.setStreamHandler(new PumpStreamHandler(System.out, System.err)); + + CommandLine cmd = CommandLine.parse("ps"); + cmd.addArgument("aux", false); + + try { + executor.execute(cmd); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + + /** Status code matcher */ protected Matcher<? super HttpMethodBase> isForbiden() { return responsesWith(403); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8546666d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 24a1b90..a65ccbc 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -89,6 +89,17 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { int sparkVersion = getSparkVersionNumber(note); if (isSparkR() && sparkVersion >= 14) { // sparkr supported from 1.4.0 + // restart spark interpreter + List<InterpreterSetting> settings = + ZeppelinServer.notebook.getBindedInterpreterSettings(note.id()); + + for (InterpreterSetting setting : settings) { + if (setting.getName().equals("spark")) { + ZeppelinServer.notebook.getInterpreterFactory().restart(setting.getId()); + break; + } + } + // run markdown paragraph, again Paragraph p = note.addParagraph(); Map config = p.getConfig(); @@ -136,16 +147,22 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { Note note = ZeppelinServer.notebook.createNote(null); note.setName("note"); - int sparkVersion = getSparkVersionNumber(note); + int sparkVersionNumber = getSparkVersionNumber(note); - if (isPyspark() && sparkVersion >= 14) { // auto_convert enabled from spark 1.4 + if (isPyspark() && sparkVersionNumber >= 14) { // auto_convert enabled from spark 1.4 // run markdown paragraph, again Paragraph p = note.addParagraph(); Map config = p.getConfig(); config.put("enabled", true); p.setConfig(config); + + String sqlContextName = "sqlContext"; + if (sparkVersionNumber >= 20) { + sqlContextName = "spark"; + } + p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" - + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); + + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); // p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open(); note.run(p.getId()); waitForFinish(p); @@ -191,8 +208,9 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { public void pySparkDepLoaderTest() throws IOException { // create new note Note note = ZeppelinServer.notebook.createNote(null); + int sparkVersionNumber = getSparkVersionNumber(note); - if (isPyspark() && getSparkVersionNumber(note) >= 14) { + if (isPyspark() && sparkVersionNumber >= 14) { // restart spark interpreter List<InterpreterSetting> settings = ZeppelinServer.notebook.getBindedInterpreterSettings(note.id()); @@ -221,9 +239,14 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { // load data using libraries from dep loader Paragraph p1 = note.addParagraph(); p1.setConfig(config); + + String sqlContextName = "sqlContext"; + if (sparkVersionNumber >= 20) { + sqlContextName = "spark"; + } p1.setText("%pyspark\n" + "from pyspark.sql import SQLContext\n" + - "print(sqlContext.read.format('com.databricks.spark.csv')" + + "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" + ".load('"+ tmpFile.getAbsolutePath() +"').count())"); note.run(p1.getId());
