Repository: zeppelin Updated Branches: refs/heads/branch-0.6 fbb9cd6c8 -> c6e42ec7b
ZEPPELIN-1267. PySparkInterpreter doesn't work in spark 2.0 ### What is this PR for? PySparkInterpreter doesn't work in spark 2.0 because pyspark and py4j is not distributed to executors. This PR extract the setup staff for pyspark interpreter into method setupConfForPySpark and use it for both spark1 and spark2. But this is just a short term solution, as I think this should be handled by spark rather than zeppelin, here zeppelin duplicate part of spark's work. In the long term, I'd like to resolve it in `ZEPPELIN-1263`. ### What type of PR is it? [Bug Fix] ### Todos * https://issues.apache.org/jira/browse/ZEPPELIN-1263 ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1267 ### How should this be tested? Verify it manually. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <[email protected]> Closes #1260 from zjffdu/ZEPPELIN-1267 and squashes the following commits: 81d1d56 [Jeff Zhang] ZEPPELIN-1267. PySparkInterpreter doesn't work in spark 2.0 (cherry picked from commit 161dd0efd21e233687d7472d97ccebe86a992582) Signed-off-by: Mina Lee <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c6e42ec7 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c6e42ec7 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c6e42ec7 Branch: refs/heads/branch-0.6 Commit: c6e42ec7bad39919d14b411215357604c20fc3ac Parents: fbb9cd6 Author: Jeff Zhang <[email protected]> Authored: Tue Aug 2 17:41:21 2016 +0800 Committer: Mina Lee <[email protected]> Committed: Thu Aug 4 17:32:16 2016 +0900 ---------------------------------------------------------------------- .../org/apache/zeppelin/spark/SparkInterpreter.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c6e42ec7/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 39d62e3..dbe3724 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -327,6 +327,7 @@ public class SparkInterpreter extends Interpreter { } } + setupConfForPySpark(conf); Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession"); Object builder = Utils.invokeStaticMethod(SparkSession, "builder"); Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf }); @@ -440,8 +441,12 @@ public class SparkInterpreter extends Interpreter { conf.set(key, val); } } + setupConfForPySpark(conf); + SparkContext sparkContext = new SparkContext(conf); + return sparkContext; + } - //TODO(jongyoul): Move these codes into PySparkInterpreter.java + private void setupConfForPySpark(SparkConf conf) { String pysparkBasePath = getSystemDefault("SPARK_HOME", null, null); File pysparkPath; if (null == pysparkBasePath) { @@ -454,7 +459,8 @@ public class SparkInterpreter extends Interpreter { } //Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist - String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip"}; + String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip", + "py4j-0.10.1-src.zip"}; ArrayList<String> pythonLibUris = new ArrayList<>(); for (String lib : pythonLibs) { File libFile = new File(pysparkPath, lib); @@ -484,9 +490,6 @@ public class SparkInterpreter extends Interpreter { if (getProperty("master").equals("yarn-client")) { conf.set("spark.yarn.isPython", "true"); } - - SparkContext sparkContext = new SparkContext(conf); - return sparkContext; } static final String toString(Object o) {
